Browse Source

[DSIP-59] remove single task definition code and ui (#16397)

* dsip 59

* fix e2e test
dev
xiangzihao 4 months ago committed by GitHub
parent
commit
a7c8d4d5ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 25
      docs/docs/en/guide/project/task-definition.md
  2. 22
      docs/docs/zh/guide/project/task-definition.md
  3. BIN
      docs/img/new_ui/dev/project/batch-task-definition.png
  4. BIN
      docs/img/new_ui/dev/project/stream-task-definition.png
  5. BIN
      docs/img/new_ui/dev/project/stream-task-execute.png
  6. 154
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
  7. 66
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskDefinitionV2Controller.java
  8. 97
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
  9. 371
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  10. 279
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  11. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  12. 15
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  13. 2
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/ProjectDetailPage.java
  14. 5
      dolphinscheduler-ui/src/layouts/content/use-dataList.ts
  15. 1
      dolphinscheduler-ui/src/locales/en_US/menu.ts
  16. 1
      dolphinscheduler-ui/src/locales/zh_CN/menu.ts
  17. 11
      dolphinscheduler-ui/src/router/modules/projects.ts
  18. 134
      dolphinscheduler-ui/src/service/modules/task-definition/index.ts
  19. 135
      dolphinscheduler-ui/src/service/modules/task-definition/types.ts
  20. 229
      dolphinscheduler-ui/src/views/projects/task/definition/batch-task.tsx
  21. 272
      dolphinscheduler-ui/src/views/projects/task/definition/components/start-modal.tsx
  22. 156
      dolphinscheduler-ui/src/views/projects/task/definition/components/use-start.tsx
  23. 213
      dolphinscheduler-ui/src/views/projects/task/definition/components/use-version.ts
  24. 109
      dolphinscheduler-ui/src/views/projects/task/definition/components/version-modal.tsx
  25. 41
      dolphinscheduler-ui/src/views/projects/task/definition/index.tsx
  26. 152
      dolphinscheduler-ui/src/views/projects/task/definition/stream-task.tsx
  27. 40
      dolphinscheduler-ui/src/views/projects/task/definition/types.ts
  28. 202
      dolphinscheduler-ui/src/views/projects/task/definition/use-stream-table.ts
  29. 348
      dolphinscheduler-ui/src/views/projects/task/definition/use-table.ts
  30. 140
      dolphinscheduler-ui/src/views/projects/task/definition/use-task.ts

25
docs/docs/en/guide/project/task-definition.md

@ -1,25 +0,0 @@
# Task Definition
## Batch Task Definition
Task definition allows to modify or operate tasks at the task level rather than modifying them in the workflow definition.
We already have workflow level task editor in [workflow definition](workflow-definition.md) which you can click the specific
workflow and then edit its task definition. It is depressing when you want to edit the task definition but do not remember
which workflow it belongs to. So we decide to add `Task Definition` view under `Task` menu.
![task-definition](../../../../img/new_ui/dev/project/batch-task-definition.png)
In this view, you can create, query, update, delete task definition by click the related button in `operation` column. The
most exciting thing is you could query task by task name in the wildcard, and it is useful when you only remember the task
name but forget which workflow it belongs to. It is also supported query by the task name alone with `Task Type` or
`Workflow Name`
## Stream Task Definition
Stream task definitions are created in the workflow definition, and can be modified and executed.
![task-definition](../../../../img/new_ui/dev/project/stream-task-definition.png)
Click the execute button, check the execution parameters and click Confirm to submit the stream task.
![task-definition](../../../../img/new_ui/dev/project/stream-task-execute.png)

22
docs/docs/zh/guide/project/task-definition.md

@ -1,22 +0,0 @@
# 任务定义
## 批量任务定义
批量任务定义允许您在基于任务级别而不是在工作流中操作修改任务。再此之前,我们已经有了工作流级别的任务编辑器,你可以在[工作流定义](workflow-definition.md)
单击特定的工作流,然后编辑任务的定义。当您想编辑特定的任务定义但不记得它属于哪个工作流时,这是令人沮丧的。所以我们决定在 `任务` 菜单下添加 `任务定义` 视图。
![task-definition](../../../../img/new_ui/dev/project/batch-task-definition.png)
在该视图中,您可以通过单击 `操作` 列中的相关按钮来进行创建、查询、更新、删除任务定义。最令人兴奋的是您可以通过通配符进行全部任务查询,当您只
记得任务名称但忘记它属于哪个工作流时是非常有用的。也支持通过任务名称结合使用 `任务类型``工作流程名称` 进行查询。
## 实时任务定义
实时任务定义在工作流定义中创建,在任务定义页面可以进行修改和执行。
![task-definition](../../../../img/new_ui/dev/project/stream-task-definition.png)
点击实时任务执行,检查执行参数后点击确认,即可提交实时任务。
![task-definition](../../../../img/new_ui/dev/project/stream-task-execute.png)

BIN
docs/img/new_ui/dev/project/batch-task-definition.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 71 KiB

BIN
docs/img/new_ui/dev/project/stream-task-definition.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

BIN
docs/img/new_ui/dev/project/stream-task-execute.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 65 KiB

154
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java

@ -17,12 +17,9 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINE_BY_CODE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINITION_VERSION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.LOGIN_USER_QUERY_PROJECT_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_TASK_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_DEFINITION_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_DEFINITION_VERSIONS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.RELEASE_TASK_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.SWITCH_TASK_DEFINITION_VERSION_ERROR;
@ -30,17 +27,12 @@ import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_DEFINITIO
import org.apache.dolphinscheduler.api.audit.OperatorLog;
import org.apache.dolphinscheduler.api.audit.enums.AuditType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
@ -74,90 +66,6 @@ public class TaskDefinitionController extends BaseController {
@Autowired
private TaskDefinitionService taskDefinitionService;
/**
* create task definition
*
* @param loginUser login user
* @param projectCode project code
* @param taskDefinitionJson task definition json
* @return create result code
*/
@Operation(summary = "save", description = "CREATE_TASK_DEFINITION_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = long.class)),
@Parameter(name = "taskDefinitionJson", description = "TASK_DEFINITION_JSON", required = true, schema = @Schema(implementation = String.class))
})
@PostMapping()
@ResponseStatus(HttpStatus.CREATED)
@ApiException(CREATE_TASK_DEFINITION_ERROR)
@OperatorLog(auditType = AuditType.TASK_CREATE)
public Result createTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson) {
Map<String, Object> result =
taskDefinitionService.createTaskDefinition(loginUser, projectCode, taskDefinitionJson);
return returnDataList(result);
}
/**
* create single task definition that binds the workflow
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param taskDefinitionJsonObj task definition json object
* @param upstreamCodes upstream task codes, sep comma
* @return create result code
*/
@Operation(summary = "saveSingle", description = "CREATE_SINGLE_TASK_DEFINITION_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = long.class)),
@Parameter(name = "processDefinitionCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class)),
@Parameter(name = "taskDefinitionJsonObj", description = "TASK_DEFINITION_JSON", required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "upstreamCodes", description = "UPSTREAM_CODES", required = false, schema = @Schema(implementation = String.class))
})
@PostMapping("/save-single")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(CREATE_TASK_DEFINITION_ERROR)
@OperatorLog(auditType = AuditType.TASK_CREATE)
public Result createTaskBindsWorkFlow(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode,
@RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj,
@RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) {
Map<String, Object> result = taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode,
processDefinitionCode, taskDefinitionJsonObj, StringUtils.defaultString(upstreamCodes));
return returnDataList(result);
}
/**
* update task definition
*
* @param loginUser login user
* @param projectCode project code
* @param code task definition code
* @param taskDefinitionJsonObj task definition json object
* @return update result code
*/
@Operation(summary = "update", description = "UPDATE_TASK_DEFINITION_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = long.class)),
@Parameter(name = "code", description = "TASK_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class, example = "1")),
@Parameter(name = "taskDefinitionJsonObj", description = "TASK_DEFINITION_JSON", required = true, schema = @Schema(implementation = String.class))
})
@PutMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_TASK_DEFINITION_ERROR)
@OperatorLog(auditType = AuditType.TASK_UPDATE)
public Result updateTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "code") long code,
@RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj) {
Map<String, Object> result =
taskDefinitionService.updateTaskDefinition(loginUser, projectCode, code, taskDefinitionJsonObj);
return returnDataList(result);
}
/**
* update task definition
*
@ -271,29 +179,6 @@ public class TaskDefinitionController extends BaseController {
return returnDataList(result);
}
/**
* delete task definition by code
*
* @param loginUser login user
* @param projectCode project code
* @param code the task definition code
* @return delete result code
*/
@Operation(summary = "deleteTaskDefinition", description = "DELETE_TASK_DEFINITION_BY_CODE_NOTES")
@Parameters({
@Parameter(name = "code", description = "TASK_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class, example = "1"))
})
@DeleteMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_TASK_DEFINE_BY_CODE_ERROR)
@OperatorLog(auditType = AuditType.TASK_DELETE)
public Result deleteTaskDefinitionByCode(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "code") long code) {
taskDefinitionService.deleteTaskDefinitionByCode(loginUser, code);
return new Result(Status.SUCCESS);
}
/**
* query detail of task definition by code
*
@ -316,45 +201,6 @@ public class TaskDefinitionController extends BaseController {
return returnDataList(result);
}
/**
* query task definition list paging
*
* @param loginUser login user
* @param projectCode project code
* @param searchWorkflowName searchWorkflowName
* @param searchTaskName searchTaskName
* @param taskType taskType
* @param taskExecuteType taskExecuteType
* @param pageNo page number
* @param pageSize page size
* @return task definition page
*/
@Operation(summary = "queryTaskDefinitionListPaging", description = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = false, schema = @Schema(implementation = long.class)),
@Parameter(name = "searchWorkflowName", description = "SEARCH_WORKFLOW_NAME", required = false, schema = @Schema(implementation = String.class)),
@Parameter(name = "searchTaskName", description = "SEARCH_TASK_NAME", required = false, schema = @Schema(implementation = String.class)),
@Parameter(name = "taskType", description = "TASK_TYPE", required = false, schema = @Schema(implementation = String.class, example = "SHELL")),
@Parameter(name = "taskExecuteType", description = "TASK_EXECUTE_TYPE", required = false, schema = @Schema(implementation = TaskExecuteType.class, example = "STREAM")),
@Parameter(name = "pageNo", description = "PAGE_NO", required = true, schema = @Schema(implementation = int.class, example = "1")),
@Parameter(name = "pageSize", description = "PAGE_SIZE", required = true, schema = @Schema(implementation = int.class, example = "10"))
})
@GetMapping()
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_DEFINITION_LIST_PAGING_ERROR)
public Result queryTaskDefinitionListPaging(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "searchTaskName", required = false) String searchTaskName,
@RequestParam(value = "taskType", required = false) String taskType,
@RequestParam(value = "taskExecuteType", required = false, defaultValue = "BATCH") TaskExecuteType taskExecuteType,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
checkPageParams(pageNo, pageSize);
searchTaskName = ParameterUtils.handleEscapes(searchTaskName);
return taskDefinitionService.queryTaskDefinitionListPaging(loginUser, projectCode,
searchTaskName, taskType, taskExecuteType, pageNo, pageSize);
}
/**
* gen task code list
*

66
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskDefinitionV2Controller.java

@ -17,16 +17,11 @@
package org.apache.dolphinscheduler.api.controller.v2;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_SCHEDULE_BY_ID_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_TASK_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_DEFINITION_ERROR;
import org.apache.dolphinscheduler.api.controller.BaseController;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -37,11 +32,9 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@ -65,65 +58,6 @@ public class TaskDefinitionV2Controller extends BaseController {
@Autowired
private TaskDefinitionService taskDefinitionService;
/**
* Create resource task definition
*
* @param loginUser login user
* @param taskCreateRequest task definition json
* @return Result object created
*/
@Operation(summary = "create", description = "CREATE_TASK_DEFINITION_NOTES")
@PostMapping(consumes = {"application/json"})
@ResponseStatus(HttpStatus.CREATED)
@ApiException(CREATE_TASK_DEFINITION_ERROR)
public Result<TaskDefinition> createTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestBody TaskCreateRequest taskCreateRequest) {
TaskDefinition taskDefinition = taskDefinitionService.createTaskDefinitionV2(loginUser, taskCreateRequest);
return Result.success(taskDefinition);
}
/**
* Delete resource task definition by code
*
* @param loginUser login user
* @param code task definition code
*/
@Operation(summary = "delete", description = "DELETE_TASK_DEFINITION_VERSION_NOTES")
@Parameters({
@Parameter(name = "code", description = "TASK_DEFINITION_CODE", schema = @Schema(implementation = long.class, example = "123456", required = true))
})
@DeleteMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_SCHEDULE_BY_ID_ERROR)
public Result deleteTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("code") Long code) {
taskDefinitionService.deleteTaskDefinitionByCode(loginUser, code);
return Result.success();
}
/**
* Update resource task definition by code
*
* @param loginUser login user
* @param code task code of resource you want to update
* @param taskUpdateRequest workflowUpdateRequest
* @return ResourceResponse object updated
*/
@Operation(summary = "update", description = "UPDATE_TASK_DEFINITION_NOTES")
@Parameters({
@Parameter(name = "code", description = "TASK_DEFINITION_CODE", schema = @Schema(implementation = long.class, example = "123456", required = true))
})
@PutMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_TASK_DEFINITION_ERROR)
public Result<TaskDefinition> updateTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("code") Long code,
@RequestBody TaskUpdateRequest taskUpdateRequest) {
TaskDefinition taskDefinition =
taskDefinitionService.updateTaskDefinitionV2(loginUser, code, taskUpdateRequest);
return Result.success(taskDefinition);
}
/**
* Get resource task definition by code
*

97
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java

@ -17,13 +17,10 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
@ -34,43 +31,6 @@ import java.util.Map;
*/
public interface TaskDefinitionService {
/**
* create task definition
*
* @param loginUser login user
* @param projectCode project code
* @param taskDefinitionJson task definition json
*/
Map<String, Object> createTaskDefinition(User loginUser,
long projectCode,
String taskDefinitionJson);
/**
* Create resource task definition
*
* @param loginUser login user
* @param taskCreateRequest task definition json
* @Return new TaskDefinition have created
*/
TaskDefinition createTaskDefinitionV2(User loginUser,
TaskCreateRequest taskCreateRequest);
/**
* create single task definition that binds the workflow
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param taskDefinitionJsonObj task definition json object
* @param upstreamCodes upstream task codes, sep comma
* @return create result code
*/
Map<String, Object> createTaskBindsWorkFlow(User loginUser,
long projectCode,
long processDefinitionCode,
String taskDefinitionJsonObj,
String upstreamCodes);
/**
* query task definition
*
@ -84,43 +44,6 @@ public interface TaskDefinitionService {
long processCode,
String taskName);
/**
* Delete resource task definition by code
*
* Only task release state offline and no downstream tasks can be deleted, will also remove the exists
* task relation [upstreamTaskCode, taskCode]
*
* @param loginUser login user
* @param taskCode task code
*/
void deleteTaskDefinitionByCode(User loginUser,
long taskCode);
/**
* update task definition
*
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
* @param taskDefinitionJsonObj task definition json object
*/
Map<String, Object> updateTaskDefinition(User loginUser,
long projectCode,
long taskCode,
String taskDefinitionJsonObj);
/**
* Update resource task definition by code
*
* @param loginUser login user
* @param taskCode task code
* @param taskUpdateRequest task definition json object
* @return new TaskDefinition have updated
*/
TaskDefinition updateTaskDefinitionV2(User loginUser,
long taskCode,
TaskUpdateRequest taskUpdateRequest);
/**
* Get resource task definition by code
*
@ -212,26 +135,6 @@ public interface TaskDefinitionService {
long projectCode,
long taskCode);
/**
* query task definition list paging
*
* @param loginUser login user
* @param projectCode project code
* @param searchTaskName searchTaskName
* @param taskType taskType
* @param taskExecuteType taskExecuteType
* @param pageNo page number
* @param pageSize page size
* @return task definition page
*/
Result queryTaskDefinitionListPaging(User loginUser,
long projectCode,
String searchTaskName,
String taskType,
TaskExecuteType taskExecuteType,
Integer pageNo,
Integer pageSize);
/**
* gen task code list
*

371
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -18,17 +18,13 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_VERSION_VIEW;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
import static org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager.checkTaskParameters;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
import org.apache.dolphinscheduler.api.enums.Status;
@ -139,54 +135,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
/**
* create task definition
*
* @param loginUser login user
* @param projectCode project code
* @param taskDefinitionJson task definition json
*/
@Transactional
@Override
public Map<String, Object> createTaskDefinition(User loginUser,
long projectCode,
String taskDefinitionJson) {
Project project = projectMapper.queryByCode(projectCode);
// check if user have write perm for project
Map<String, Object> result = new HashMap<>();
boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result);
if (!hasProjectAndWritePerm) {
return result;
}
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
log.warn("Parameter taskDefinitionJson is invalid.");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return result;
}
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!checkTaskParameters(taskDefinitionLog.getTaskType(), taskDefinitionLog.getTaskParams())) {
log.warn("Task definition {} parameters are invalid.", taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return result;
}
}
int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, Boolean.TRUE);
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
log.error("Create task definition error, projectCode:{}.", projectCode);
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
Map<String, Object> resData = new HashMap<>();
resData.put("total", taskDefinitionLogs.size());
resData.put("code", StringUtils
.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ","));
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, resData);
return result;
}
private TaskDefinitionLog persist2TaskDefinitionLog(User user, TaskDefinition taskDefinition) {
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
taskDefinitionLog.setOperator(user.getId());
@ -226,171 +174,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
workflowUpdateRequest);
}
/**
* Create resource task definition
*
* @param loginUser login user
* @param taskCreateRequest task definition json
* @return new TaskDefinition have created
*/
@Override
@Transactional
public TaskDefinition createTaskDefinitionV2(User loginUser,
TaskCreateRequest taskCreateRequest) {
TaskDefinition taskDefinition = taskCreateRequest.convert2TaskDefinition();
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(taskCreateRequest.getWorkflowCode());
if (processDefinition == null) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, taskCreateRequest.getWorkflowCode());
}
// Add project code from process definition if not exists
if (taskDefinition.getProjectCode() == 0L) {
taskDefinition.setProjectCode(processDefinition.getProjectCode());
}
this.checkTaskDefinitionValid(loginUser, taskDefinition, TASK_DEFINITION_CREATE);
long taskDefinitionCode;
try {
taskDefinitionCode = CodeGenerateUtils.genCode();
} catch (CodeGenerateException e) {
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
taskDefinition.setCode(taskDefinitionCode);
int create = taskDefinitionMapper.insert(taskDefinition);
if (create <= 0) {
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
this.persist2TaskDefinitionLog(loginUser, taskDefinition);
// update related objects: task relationship, workflow's location(need to set to null and front-end will auto
// format it)
this.updateTaskUpstreams(loginUser, taskCreateRequest.getWorkflowCode(), taskDefinition.getCode(),
taskCreateRequest.getUpstreamTasksCodes());
this.updateWorkflowLocation(loginUser, processDefinition);
return taskDefinition;
}
/**
* create single task definition that binds the workflow
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param taskDefinitionJsonObj task definition json object
* @param upstreamCodes upstream task codes, sep comma
* @return create result code
*/
@Transactional
@Override
public Map<String, Object> createTaskBindsWorkFlow(User loginUser,
long projectCode,
long processDefinitionCode,
String taskDefinitionJsonObj,
String upstreamCodes) {
Project project = projectMapper.queryByCode(projectCode);
// check if user have write perm for project
Map<String, Object> result = new HashMap<>();
boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result);
if (!hasProjectAndWritePerm) {
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
log.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
log.warn("Task definition can not be created due to process definition is {}, processDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, String.valueOf(processDefinitionCode));
return result;
}
TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinition == null) {
log.warn("Parameter taskDefinitionJsonObj is invalid json.");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
return result;
}
if (!checkTaskParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams())) {
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName());
return result;
}
long taskCode = taskDefinition.getCode();
if (taskCode == 0) {
taskDefinition.setCode(CodeGenerateUtils.genCode());
}
List<ProcessTaskRelationLog> processTaskRelationLogList =
processTaskRelationMapper.queryByProcessCode(processDefinitionCode)
.stream()
.map(ProcessTaskRelationLog::new)
.collect(Collectors.toList());
if (StringUtils.isNotBlank(upstreamCodes)) {
Set<Long> upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
.collect(Collectors.toSet());
List<TaskDefinition> upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
Set<Long> queryUpStreamTaskCodes =
upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet());
// upstreamTaskCodes - queryUpStreamTaskCodes
Set<Long> diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code))
.collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(diffCode)) {
String taskCodes = StringUtils.join(diffCode, Constants.COMMA);
log.error("Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}.",
taskCodes);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCodes);
return result;
}
for (TaskDefinition upstreamTask : upstreamTaskDefinitionList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setPreTaskCode(upstreamTask.getCode());
processTaskRelationLog.setPreTaskVersion(upstreamTask.getVersion());
processTaskRelationLog.setPostTaskCode(taskCode);
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setConditionType(ConditionType.NONE);
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLogList.add(processTaskRelationLog);
}
} else {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setPreTaskCode(0);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLog.setPostTaskCode(taskCode);
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setConditionType(ConditionType.NONE);
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLogList.add(processTaskRelationLog);
}
int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(),
processDefinition.getVersion(),
processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE);
if (insertResult != Constants.EXIT_CODE_SUCCESS) {
log.error(
"Save new version process task relations error, processDefinitionCode:{}, processDefinitionVersion:{}.",
processDefinition.getCode(), processDefinition.getVersion());
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
} else
log.info(
"Save new version process task relations complete, processDefinitionCode:{}, processDefinitionVersion:{}.",
processDefinition.getCode(), processDefinition.getVersion());
int saveTaskResult =
processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE);
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
log.error("Save task definition error, projectCode:{}, taskDefinitionCode:{}.", projectCode,
taskDefinition.getCode());
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
} else
log.info("Save task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode,
taskDefinition.getCode());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, taskDefinition);
return result;
}
/**
* query task definition
*
@ -453,45 +236,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
}
/**
* Delete resource task definition by code
* <p>
* Only task release state offline and no downstream tasks can be deleted, will also remove the exists
* task relation [upstreamTaskCode, taskCode]
*
* @param loginUser login user
* @param taskCode task code
*/
@Transactional
@Override
public void deleteTaskDefinitionByCode(User loginUser, long taskCode) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode);
}
this.taskCanDeleteValid(loginUser, taskDefinition, loginUser);
int delete = taskDefinitionMapper.deleteByCode(taskCode);
if (delete <= 0) {
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR, taskDefinition.getCode());
}
// Delete task upstream tasks if exists
List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(taskDefinition.getProjectCode(), taskCode);
if (CollectionUtils.isNotEmpty(taskRelationList)) {
log.debug(
"Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.",
taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
List<ProcessTaskRelation> relationList = processTaskRelations.stream()
.filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList());
updateDag(loginUser, processDefinitionCode, relationList, Lists.newArrayList());
}
}
public void updateDag(User loginUser, long processDefinitionCode,
List<ProcessTaskRelation> processTaskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs) {
@ -525,42 +269,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
}
/**
* update task definition
*
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
* @param taskDefinitionJsonObj task definition json object
*/
@Transactional
@Override
public Map<String, Object> updateTaskDefinition(User loginUser, long projectCode, long taskCode,
String taskDefinitionJsonObj) {
Map<String, Object> result = new HashMap<>();
TaskDefinitionLog taskDefinitionToUpdate =
updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
if (taskDefinitionToUpdate == null) {
return result;
}
List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (CollectionUtils.isNotEmpty(taskRelationList)) {
log.info(
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.",
taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
updateDag(loginUser, processDefinitionCode, processTaskRelations,
Lists.newArrayList(taskDefinitionToUpdate));
}
log.info("Update task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
}
private void TaskDefinitionUpdateValid(TaskDefinition taskDefinitionOriginal, TaskDefinition taskDefinitionUpdate) {
// Task already online
if (processService.isTaskOnline(taskDefinitionOriginal.getCode())
@ -583,52 +291,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
}
/**
* update task definition
*
* @param loginUser login user
* @param taskCode task code
* @param taskUpdateRequest task definition json object
* @return new TaskDefinition have updated
*/
@Transactional
@Override
public TaskDefinition updateTaskDefinitionV2(User loginUser,
long taskCode,
TaskUpdateRequest taskUpdateRequest) {
TaskDefinition taskDefinitionOriginal = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinitionOriginal == null) {
throw new ServiceException(Status.TASK_DEFINITION_NOT_EXISTS, taskCode);
}
TaskDefinition taskDefinitionUpdate = taskUpdateRequest.mergeIntoTaskDefinition(taskDefinitionOriginal);
this.checkTaskDefinitionValid(loginUser, taskDefinitionUpdate, TASK_DEFINITION_UPDATE);
this.TaskDefinitionUpdateValid(taskDefinitionOriginal, taskDefinitionUpdate);
int update = taskDefinitionMapper.updateById(taskDefinitionUpdate);
if (update <= 0) {
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
TaskDefinitionLog taskDefinitionLog = this.persist2TaskDefinitionLog(loginUser, taskDefinitionUpdate);
List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(taskDefinitionUpdate.getProjectCode(), taskCode);
if (CollectionUtils.isNotEmpty(taskRelationList)) {
log.info(
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.",
taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog));
}
this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(),
taskUpdateRequest.getUpstreamTasksCodes());
return taskDefinitionUpdate;
}
/**
* Get resource task definition by code
*
@ -1144,39 +806,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
@Override
public Result queryTaskDefinitionListPaging(User loginUser,
long projectCode,
String searchTaskName,
String taskType,
TaskExecuteType taskExecuteType,
Integer pageNo,
Integer pageSize) {
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
putMsg(result, resultStatus);
return result;
}
taskType = taskType == null ? StringUtils.EMPTY : taskType;
Page<TaskMainInfo> page = new Page<>(pageNo, pageSize);
// first, query task code by page size
IPage<TaskMainInfo> taskMainInfoIPage = taskDefinitionMapper.queryDefineListPaging(page, projectCode,
searchTaskName, taskType, taskExecuteType);
// then, query task relevant info by task code
fillRecords(projectCode, taskMainInfoIPage);
PageInfo<TaskMainInfo> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotal((int) taskMainInfoIPage.getTotal());
pageInfo.setTotalList(taskMainInfoIPage.getRecords());
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
private void fillRecords(long projectCode, IPage<TaskMainInfo> taskMainInfoIPage) {
List<TaskMainInfo> records = Collections.emptyList();
if (CollectionUtils.isNotEmpty(taskMainInfoIPage.getRecords())) {

279
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -17,12 +17,7 @@
package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow;
import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
@ -31,18 +26,12 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.TaskDefinitionServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -53,7 +42,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
@ -64,14 +52,12 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.shell.ShellTaskChannelFactory;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.ProcessServiceImpl;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -88,9 +74,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class TaskDefinitionServiceImplTest {
@ -161,80 +144,6 @@ public class TaskDefinitionServiceImplTest {
user = loginUser;
}
@Test
public void createTaskDefinition() {
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic
.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any(), Mockito.any()))
.thenReturn(true);
Project project = getProject();
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
when(projectService.hasProjectAndWritePerm(user, project, result))
.thenReturn(true);
String createTaskDefinitionJson =
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(user, PROJECT_CODE, createTaskDefinitionJson);
assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
}
@Test
public void updateTaskDefinition() {
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic
.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any(), Mockito.any()))
.thenReturn(true);
String taskDefinitionJson = getTaskDefinitionJson();
Project project = getProject();
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, PROJECT_CODE);
when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true);
when(processService.isTaskOnline(TASK_CODE)).thenReturn(Boolean.FALSE);
when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition());
when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1);
when(processDefinitionMapper.queryByCode(2L)).thenReturn(new ProcessDefinition());
when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1);
when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1);
when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE, 0))
.thenReturn(getProcessTaskRelationList2());
when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
// failure
when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE,
taskDefinitionJson));
assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(),
((ServiceException) exception).getCode());
}
}
@Test
public void queryTaskDefinitionByName() {
String taskName = "task";
@ -255,33 +164,6 @@ public class TaskDefinitionServiceImplTest {
assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@Test
public void deleteTaskDefinitionByCode() {
Project project = getProject();
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
// error task definition not find
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE));
assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error delete single task definition object
when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition());
when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(0);
when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE));
assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR.getCode(),
((ServiceException) exception).getCode());
// success
doNothing().when(projectService).checkProjectAndAuthThrowException(user, project,
TASK_DEFINITION_DELETE);
when(processTaskRelationMapper.queryDownstreamByTaskCode(TASK_CODE)).thenReturn(new ArrayList<>());
when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(1);
Assertions.assertDoesNotThrow(() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE));
}
@Test
public void switchVersion() {
Project project = getProject();
@ -356,33 +238,6 @@ public class TaskDefinitionServiceImplTest {
assertEquals(Status.SUCCESS, genTaskCodeList.get(Constants.STATUS));
}
@Test
public void testQueryTaskDefinitionListPaging() {
Project project = getProject();
Map<String, Object> checkResult = new HashMap<>();
checkResult.put(Constants.STATUS, Status.SUCCESS);
Integer pageNo = 1;
Integer pageSize = 10;
IPage<TaskMainInfo> taskMainInfoIPage = new Page<>();
TaskMainInfo taskMainInfo = new TaskMainInfo();
taskMainInfo.setTaskCode(TASK_CODE);
taskMainInfo.setUpstreamTaskCode(4L);
taskMainInfo.setUpstreamTaskName("4");
taskMainInfoIPage.setRecords(Collections.singletonList(taskMainInfo));
taskMainInfoIPage.setTotal(10L);
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION))
.thenReturn(checkResult);
when(taskDefinitionMapper.queryDefineListPaging(Mockito.any(Page.class), Mockito.anyLong(),
Mockito.isNull(), Mockito.anyString(), Mockito.isNull()))
.thenReturn(taskMainInfoIPage);
when(taskDefinitionMapper.queryDefineListByCodeList(PROJECT_CODE, Collections.singletonList(3L)))
.thenReturn(Collections.singletonList(taskMainInfo));
Result result = taskDefinitionService.queryTaskDefinitionListPaging(user, PROJECT_CODE,
null, null, null, pageNo, pageSize);
assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
@Test
public void testReleaseTaskDefinition() {
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
@ -425,140 +280,6 @@ public class TaskDefinitionServiceImplTest {
assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS));
}
@Test
public void testCreateTaskDefinitionV2() {
TaskCreateRequest taskCreateRequest = new TaskCreateRequest();
taskCreateRequest.setProjectCode(PROJECT_CODE);
taskCreateRequest.setWorkflowCode(PROCESS_DEFINITION_CODE);
// error process definition not find
assertThrowsServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
// error project not find
when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition());
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
.checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_CREATE);
assertThrowsServiceException(Status.PROJECT_NOT_EXIST,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
// error task definition
taskCreateRequest.setTaskType(ShellTaskChannelFactory.NAME);
taskCreateRequest.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_CREATE);
assertThrowsServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic
.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any(), Mockito.any()))
.thenReturn(true);
// error create task definition object
when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(0);
assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_ERROR,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
// error sync to task definition log
when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(1);
when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0);
assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_LOG_ERROR,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
// success
when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
when(
processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
isA(Boolean.class),
isA(TaskRelationUpdateUpstreamRequest.class)))
.thenReturn(getProcessTaskRelationList());
when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class),
isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition());
assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
}
}
@Test
public void testUpdateTaskDefinitionV2() {
TaskUpdateRequest taskUpdateRequest = new TaskUpdateRequest();
TaskDefinition taskDefinition = getTaskDefinition();
Project project = getProject();
// error task definition not exists
assertThrowsServiceException(Status.TASK_DEFINITION_NOT_EXISTS,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
// error project not find
when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition);
when(projectMapper.queryByCode(isA(Long.class))).thenReturn(project);
doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
.checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_UPDATE);
assertThrowsServiceException(Status.PROJECT_NOT_EXIST,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
// error task definition
doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_UPDATE);
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic
.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any(), Mockito.any()))
.thenReturn(false);
assertThrowsServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
}
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic
.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any(), Mockito.any()))
.thenReturn(true);
// error task definition nothing update
when(processService.isTaskOnline(TASK_CODE)).thenReturn(false);
assertThrowsServiceException(Status.TASK_DEFINITION_NOT_CHANGE,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
// error task definition version invalid
taskUpdateRequest.setTaskPriority(String.valueOf(Priority.HIGH));
assertThrowsServiceException(Status.DATA_IS_NOT_VALID,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
// error task definition update effect number
when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(VERSION);
when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(0);
assertThrowsServiceException(Status.UPDATE_TASK_DEFINITION_ERROR,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
// error task definition log insert
when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(1);
when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0);
assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_LOG_ERROR,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
// success
when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
when(
processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
isA(Boolean.class),
isA(TaskRelationUpdateUpstreamRequest.class)))
.thenReturn(getProcessTaskRelationList());
Assertions.assertDoesNotThrow(
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
taskDefinition =
taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest);
assertEquals(getTaskDefinition().getVersion() + 1, taskDefinition.getVersion());
}
}
@Test
public void testUpdateDag() {
User loginUser = getLoginUser();

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
@ -88,22 +87,6 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
*/
int batchInsert(@Param("taskDefinitions") List<TaskDefinitionLog> taskDefinitions);
/**
* task main info page
*
* @param page page
* @param projectCode projectCode
* @param searchTaskName searchTaskName
* @param taskType taskType
* @param taskExecuteType taskExecuteType
* @return task main info IPage
*/
IPage<TaskMainInfo> queryDefineListPaging(IPage<TaskMainInfo> page,
@Param("projectCode") long projectCode,
@Param("searchTaskName") String searchTaskName,
@Param("taskType") String taskType,
@Param("taskExecuteType") TaskExecuteType taskExecuteType);
/**
* task main info
* @param projectCode project code

15
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -98,21 +98,6 @@
#{taskDefinition.taskGroupId}, #{taskDefinition.taskExecuteType})
</foreach>
</insert>
<select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.code task_code
from t_ds_task_definition td
WHERE td.project_code = #{projectCode}
<if test="taskType != ''">
and td.task_type = #{taskType}
</if>
<if test="taskExecuteType != null">
and td.task_execute_type = #{taskExecuteType.code}
</if>
<if test="searchTaskName != null and searchTaskName != ''">
and td.name like concat('%', #{searchTaskName}, '%')
</if>
order by td.update_time desc
</select>
<select id="queryDefineListByCodeList" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.name task_name,td.code task_code,td.version task_version,td.task_type,td.create_time task_create_time,td.update_time task_update_time,

2
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/ProjectDetailPage.java

@ -42,7 +42,7 @@ public final class ProjectDetailPage extends NavBarPage {
@FindBy(css = ".tab-vertical .n-submenu:nth-of-type(2) .n-menu-item:nth-of-type(3) > .n-menu-item-content")
private WebElement menuProcessInstances;
@FindBy(css = ".tab-vertical .n-submenu:nth-of-type(3) .n-menu-item:nth-of-type(2) > .n-menu-item-content")
@FindBy(xpath = "//div[contains(@class, 'n-menu-item-content')]//div[contains(., 'Task Instance')]")
private WebElement menuTaskInstances;
public ProjectDetailPage(RemoteWebDriver driver) {

5
dolphinscheduler-ui/src/layouts/content/use-dataList.ts

@ -156,11 +156,6 @@ export function useDataList() {
key: 'task',
icon: renderIcon(SettingOutlined),
children: [
{
label: t('menu.task_definition'),
key: `/projects/${projectCode}/task/definitions`,
payload: { projectName: projectName }
},
{
label: t('menu.task_instance'),
key: `/projects/${projectCode}/task/instances`,

1
dolphinscheduler-ui/src/locales/en_US/menu.ts

@ -32,7 +32,6 @@ export default {
workflow_timing: 'Workflow Timing',
task: 'Task',
task_instance: 'Task Instance',
task_definition: 'Task Definition',
file_manage: 'File Manage',
resource_manage: 'Resource Manage',
function_manage: 'Function Manage',

1
dolphinscheduler-ui/src/locales/zh_CN/menu.ts

@ -33,7 +33,6 @@ export default {
workflow_timing: '工作流定时',
task: '任务',
task_instance: '任务实例',
task_definition: '任务定义',
file_manage: '文件管理',
resource_manage: '资源管理',
function_manage: '函数管理',

11
dolphinscheduler-ui/src/router/modules/projects.ts

@ -179,17 +179,6 @@ export default {
auth: []
}
},
{
path: '/projects/:projectCode/task/definitions',
name: 'task-definition',
component: components['projects-task-definition'],
meta: {
title: '任务定义',
activeMenu: 'projects',
showSide: true,
auth: []
}
},
{
path: '/projects/:projectCode/task/instances',
name: 'task-instance',

134
dolphinscheduler-ui/src/service/modules/task-definition/index.ts

@ -16,29 +16,7 @@
*/
import { axios } from '@/service/service'
import {
PageReq,
ProjectCodeReq,
TaskDefinitionListReq,
TaskDefinitionJsonReq,
CodeReq,
TaskDefinitionJsonObjReq,
ReleaseStateReq,
VersionReq,
ISingleSaveReq,
TaskDefinitionReq
} from './types'
export function queryTaskDefinitionListPaging(
params: TaskDefinitionListReq,
projectCode: ProjectCodeReq
): any {
return axios({
url: `/projects/${projectCode.projectCode}/task-definition`,
method: 'get',
params
})
}
import { ProjectCodeReq, TaskDefinitionJsonReq } from './types'
export function save(
data: TaskDefinitionJsonReq,
@ -60,113 +38,3 @@ export function genTaskCodeList(num: number, projectCode: number) {
}
})
}
export function queryTaskDefinitionByCode(
code: number,
projectCode: number
): any {
return axios({
url: `/projects/${projectCode}/task-definition/${code}`,
method: 'get'
})
}
export function updateTask(
projectCode: number,
code: number,
data: TaskDefinitionJsonObjReq
): any {
return axios({
url: `/projects/${projectCode}/task-definition/${code}`,
method: 'put',
data
})
}
export function deleteTaskDefinition(
code: CodeReq,
projectCode: ProjectCodeReq
): any {
return axios({
url: `/projects/${projectCode.projectCode}/task-definition/${code.code}`,
method: 'delete'
})
}
export function releaseTaskDefinition(
data: ReleaseStateReq,
code: number,
projectCode: number
): any {
return axios({
url: `/projects/${projectCode}/task-definition/${code}/release`,
method: 'post',
data
})
}
export function queryTaskVersions(
params: PageReq,
code: CodeReq,
projectCode: ProjectCodeReq
): any {
return axios({
url: `/projects/${projectCode.projectCode}/task-definition/${code.code}/versions`,
method: 'get',
params
})
}
export function switchVersion(
version: VersionReq,
code: CodeReq,
projectCode: ProjectCodeReq
): any {
return axios({
url: `/projects/${projectCode.projectCode}/task-definition/${code.code}/versions/${version.version}`,
method: 'get'
})
}
export function deleteVersion(
version: VersionReq,
code: CodeReq,
projectCode: ProjectCodeReq
): any {
return axios({
url: `/projects/${projectCode.projectCode}/task-definition/${code.code}/versions/${version.version}`,
method: 'delete'
})
}
export function saveSingle(projectCode: number, data: ISingleSaveReq) {
return axios({
url: `/projects/${projectCode}/task-definition/save-single`,
method: 'post',
data
})
}
export function updateWithUpstream(
projectCode: number,
code: number,
data: ISingleSaveReq
) {
return axios({
url: `/projects/${projectCode}/task-definition/${code}/with-upstream`,
method: 'put',
data
})
}
export function startTaskDefinition(
projectCode: number,
code: number,
data: TaskDefinitionReq
) {
return axios({
url: `projects/${projectCode}/executors/task-instance/${code}/start`,
method: 'post',
data
})
}

135
dolphinscheduler-ui/src/service/modules/task-definition/types.ts

@ -15,145 +15,12 @@
* limitations under the License.
*/
interface PageReq {
pageNo: number
pageSize: number
}
interface ListReq extends PageReq {
searchVal?: string
}
interface ProjectCodeReq {
projectCode: number
}
interface TaskDefinitionListReq extends ListReq {
taskType?: string
userId?: number
taskExecuteType?: 'BATCH' | 'STREAM'
}
interface TaskDefinitionJsonReq {
taskDefinitionJson: string
}
interface CodeReq {
code: any
}
interface TaskDefinitionJsonObjReq {
taskDefinitionJsonObj: string
taskExecuteType?: string
}
interface ReleaseStateReq {
releaseState: 'OFFLINE' | 'ONLINE'
}
interface VersionReq {
version: number
}
interface TaskDefinitionItem {
taskName: string
taskCode: any
taskVersion: number
taskType: string
taskCreateTime: string
taskUpdateTime: string
processDefinitionCode: any
processDefinitionVersion: number
processDefinitionName: string
processReleaseState: string
upstreamTaskMap: any
upstreamTaskCode: number
upstreamTaskName: string
}
interface TaskDefinitionRes {
totalList: TaskDefinitionItem[]
total: number
totalPage: number
pageSize: number
currentPage: number
start: number
}
interface TaskDefinitionVersionItem {
id: number
code: number
name: string
version: number
description: string
projectCode: number
userId: number
taskType: string
taskParams: any
taskParamList: any[]
taskParamMap: any
flag: string
taskPriority: string
userName?: any
projectName?: any
workerGroup: string
environmentCode: number
failRetryTimes: number
failRetryInterval: number
timeoutFlag: string
timeoutNotifyStrategy?: any
timeout: number
delayTime: number
resourceIds: string
createTime: string
updateTime: string
modifyBy?: any
taskGroupId: number
taskGroupPriority: number
operator: number
operateTime: string
dependence: string
}
interface TaskDefinitionVersionRes {
totalList: TaskDefinitionVersionItem[]
total: number
totalPage: number
pageSize: number
currentPage: number
start: number
}
interface ISingleSaveReq {
processDefinitionCode?: string
upstreamCodes: string
taskDefinitionJsonObj: string
}
interface TaskDefinitionReq {
version: number
warningType: string
warningGroupId: number
workerGroup?: string
environmentCode?: number
startParams?: string
dryRun?: number
}
export {
PageReq,
ListReq,
ProjectCodeReq,
TaskDefinitionListReq,
TaskDefinitionJsonReq,
CodeReq,
TaskDefinitionJsonObjReq,
ReleaseStateReq,
VersionReq,
TaskDefinitionItem,
TaskDefinitionRes,
TaskDefinitionVersionItem,
TaskDefinitionVersionRes,
ISingleSaveReq,
TaskDefinitionReq
}
export { ProjectCodeReq, TaskDefinitionJsonReq }

229
dolphinscheduler-ui/src/views/projects/task/definition/batch-task.tsx

@ -1,229 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
defineComponent,
getCurrentInstance,
onMounted,
toRefs,
watch
} from 'vue'
import { useRoute } from 'vue-router'
import {
NButton,
NDataTable,
NIcon,
NInput,
NPagination,
NSelect,
NSpace
} from 'naive-ui'
import { SearchOutlined } from '@vicons/antd'
import { useI18n } from 'vue-i18n'
import { useTable } from './use-table'
import { useTask } from './use-task'
import { TASK_TYPES_MAP } from '@/store/project/task-type'
import Card from '@/components/card'
import VersionModal from './components/version-modal'
import TaskModal from '@/views/projects/task/components/node/detail-modal'
import type { INodeData } from './types'
import DependenciesModal from '@/views/projects/components/dependencies/dependencies-modal'
const BatchTaskDefinition = defineComponent({
name: 'batch-task-definition',
setup() {
const route = useRoute()
const projectCode = Number(route.params.projectCode)
const { t } = useI18n()
const { task, onToggleShow, onTaskSave, onEditTask, onInitTask } =
useTask(projectCode)
const { variables, getTableData, createColumns } = useTable(onEditTask)
const requestData = () => {
getTableData({
pageSize: variables.pageSize,
pageNo: variables.page,
searchTaskName: variables.searchTaskName,
taskType: variables.taskType
})
}
const onUpdatePageSize = () => {
variables.page = 1
requestData()
}
const onSearch = () => {
variables.page = 1
requestData()
}
const onClearSearchTaskName = () => {
variables.searchTaskName = null
onSearch()
}
const onClearSearchTaskType = () => {
variables.taskType = null
onSearch()
}
const onRefresh = () => {
variables.showVersionModalRef = false
requestData()
}
const onCreate = () => {
onToggleShow(true)
}
const onTaskCancel = () => {
onToggleShow(false)
onInitTask()
}
const onTaskSubmit = async (params: { data: INodeData }) => {
const result = await onTaskSave(params.data)
if (result) {
onTaskCancel()
onRefresh()
}
}
const trim = getCurrentInstance()?.appContext.config.globalProperties.trim
onMounted(() => {
createColumns(variables)
requestData()
})
watch(useI18n().locale, () => {
createColumns(variables)
})
return {
t,
...toRefs(variables),
...toRefs(task),
onSearch,
onClearSearchTaskName,
onClearSearchTaskType,
requestData,
onUpdatePageSize,
onRefresh,
onCreate,
onTaskSubmit,
onTaskCancel,
projectCode,
trim
}
},
render() {
const {
t,
onSearch,
requestData,
onUpdatePageSize,
onRefresh,
onCreate,
loadingRef
} = this
return (
<NSpace vertical>
<Card>
<NSpace justify='space-between'>
<NButton size='small' type='primary' onClick={onCreate}>
{t('project.task.create_task')}
</NButton>
<NSpace>
<NInput
allowInput={this.trim}
size='small'
clearable
v-model={[this.searchTaskName, 'value']}
placeholder={t('project.task.task_name')}
onClear={this.onClearSearchTaskName}
/>
<NSelect
v-model={[this.taskType, 'value']}
size='small'
options={Object.keys(TASK_TYPES_MAP).map((item) => {
return { value: item, label: item }
})}
placeholder={t('project.task.task_type')}
style={{ width: '180px' }}
clearable
onClear={this.onClearSearchTaskType}
/>
<NButton size='small' type='primary' onClick={onSearch}>
<NIcon>
<SearchOutlined />
</NIcon>
</NButton>
</NSpace>
</NSpace>
</Card>
<Card title={t('project.task.batch_task')}>
<NSpace vertical>
<NDataTable
loading={loadingRef}
columns={this.columns}
data={this.tableData}
scrollX={this.tableWidth}
/>
<NSpace justify='center'>
<NPagination
v-model:page={this.page}
v-model:page-size={this.pageSize}
page-count={this.totalPage}
show-size-picker
page-sizes={[10, 30, 50]}
show-quick-jumper
onUpdatePage={requestData}
onUpdatePageSize={onUpdatePageSize}
/>
</NSpace>
</NSpace>
</Card>
<VersionModal
show={this.showVersionModalRef}
row={this.row}
onConfirm={() => (this.showVersionModalRef = false)}
onRefresh={onRefresh}
/>
<TaskModal
show={this.taskShow}
data={this.taskData}
onSubmit={this.onTaskSubmit}
onCancel={this.onTaskCancel}
projectCode={this.projectCode}
from={1}
readonly={this.taskReadonly}
saving={this.taskSaving}
/>
<DependenciesModal
v-model:show={this.dependenciesData.showRef}
v-model:taskLinks={this.dependenciesData.taskLinks}
required={this.dependenciesData.required}
content={this.dependenciesData.tip}
onConfirm={this.dependenciesData.action}
/>
</NSpace>
)
}
})
export default BatchTaskDefinition

272
dolphinscheduler-ui/src/views/projects/task/definition/components/start-modal.tsx

@ -1,272 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
defineComponent,
PropType,
toRefs,
onMounted,
watch,
getCurrentInstance
} from 'vue'
import { useI18n } from 'vue-i18n'
import Modal from '@/components/modal'
import { useStart } from './use-start'
import {
NForm,
NFormItem,
NButton,
NIcon,
NInput,
NSpace,
NSelect,
NSwitch
} from 'naive-ui'
import { DeleteOutlined, PlusCircleOutlined } from '@vicons/antd'
const props = {
row: {
type: Object,
default: {}
},
show: {
type: Boolean as PropType<boolean>,
default: false
},
taskCode: {
type: String
}
}
export default defineComponent({
name: 'task-definition-start',
props,
emits: ['update:show', 'update:row', 'updateList'],
setup(props, ctx) {
const { t } = useI18n()
const {
variables,
handleStartDefinition,
getWorkerGroups,
getTenantList,
getAlertGroups,
getEnvironmentList,
getStartParamsList
} = useStart(ctx)
const generalWarningTypeListOptions = () => [
{
value: 'NONE',
label: t('project.task.none_send')
},
{
value: 'SUCCESS',
label: t('project.task.success_send')
},
{
value: 'FAILURE',
label: t('project.task.failure_send')
},
{
value: 'ALL',
label: t('project.task.all_send')
}
]
const hideModal = () => {
ctx.emit('update:show')
}
const handleStart = () => {
handleStartDefinition(props.row.taskCode)
}
const updateWorkerGroup = () => {
variables.startForm.environmentCode = null
}
const addStartParams = () => {
variables.startState.startParamsList.push({
prop: '',
value: ''
})
}
const updateParamsList = (index: number, param: Array<string>) => {
variables.startState.startParamsList[index].prop = param[0]
variables.startState.startParamsList[index].value = param[1]
}
const removeStartParams = (index: number) => {
variables.startState.startParamsList.splice(index, 1)
}
const trim = getCurrentInstance()?.appContext.config.globalProperties.trim
onMounted(() => {
getWorkerGroups()
getTenantList()
getAlertGroups()
getEnvironmentList()
})
watch(
() => props.show,
() => {
if (props.show) {
getStartParamsList(props.row.processDefinitionCode)
}
}
)
return {
t,
hideModal,
handleStart,
updateWorkerGroup,
removeStartParams,
addStartParams,
updateParamsList,
generalWarningTypeListOptions,
...toRefs(variables),
...toRefs(variables.startState),
...toRefs(props),
trim
}
},
render() {
const { t } = this
return (
<Modal
show={this.show}
title={t('project.task.set_parameters_before_starting')}
onCancel={this.hideModal}
onConfirm={this.handleStart}
confirmLoading={this.saving}
>
<NForm ref='startFormRef' model={this.startForm}>
<NFormItem label={t('project.task.task_name')} path='task_name'>
<div title={this.row.taskName}>{this.row.taskName}</div>
</NFormItem>
<NFormItem
label={t('project.task.notification_strategy')}
path='warningType'
>
<NSelect
options={this.generalWarningTypeListOptions()}
v-model:value={this.startForm.warningType}
/>
</NFormItem>
<NFormItem label={t('project.task.worker_group')} path='workerGroup'>
<NSelect
options={this.workerGroups}
onUpdateValue={this.updateWorkerGroup}
v-model:value={this.startForm.workerGroup}
/>
</NFormItem>
<NFormItem label={t('project.task.tenant_code')} path='tenantCode'>
<NSelect
options={this.tenantList}
v-model:value={this.startForm.tenantCode}
/>
</NFormItem>
<NFormItem
label={t('project.task.environment_name')}
path='environmentCode'
>
<NSelect
options={this.environmentList.filter((item: any) =>
item.workerGroups?.includes(this.startForm.workerGroup)
)}
v-model:value={this.startForm.environmentCode}
clearable
/>
</NFormItem>
<NFormItem
label={t('project.task.alarm_group')}
path='warningGroupId'
>
<NSelect
options={this.alertGroups}
placeholder={t('project.task.please_choose')}
v-model:value={this.startForm.warningGroupId}
clearable
/>
</NFormItem>
<NFormItem
label={t('project.task.startup_parameter')}
path='startup_parameter'
>
{this.startParamsList.length === 0 ? (
<NButton text type='primary' onClick={this.addStartParams}>
<NIcon>
<PlusCircleOutlined />
</NIcon>
</NButton>
) : (
<NSpace vertical>
{this.startParamsList.map((item, index) => (
<NSpace key={Date.now() + index}>
<NInput
allowInput={this.trim}
pair
separator=':'
placeholder={['prop', 'value']}
defaultValue={[item.prop, item.value]}
onUpdateValue={(param) =>
this.updateParamsList(index, param)
}
/>
<NButton
text
type='error'
onClick={() => this.removeStartParams(index)}
class='btn-delete-custom-parameter'
>
<NIcon>
<DeleteOutlined />
</NIcon>
</NButton>
<NButton
text
type='primary'
onClick={this.addStartParams}
class='btn-create-custom-parameter'
>
<NIcon>
<PlusCircleOutlined />
</NIcon>
</NButton>
</NSpace>
))}
</NSpace>
)}
</NFormItem>
<NFormItem label={t('project.task.whether_dry_run')} path='dryRun'>
<NSwitch
checkedValue={1}
uncheckedValue={0}
v-model:value={this.startForm.dryRun}
/>
</NFormItem>
</NForm>
</Modal>
)
}
})

156
dolphinscheduler-ui/src/views/projects/task/definition/components/use-start.tsx

@ -1,156 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { startTaskDefinition } from '@/service/modules/task-definition'
import _ from 'lodash'
import { reactive, ref, SetupContext } from 'vue'
import { useI18n } from 'vue-i18n'
import { useRoute } from 'vue-router'
import { queryProcessDefinitionByCode } from '@/service/modules/process-definition'
import { queryAllWorkerGroups } from '@/service/modules/worker-groups'
import { queryTenantList } from '@/service/modules/tenants'
import { queryAllEnvironmentList } from '@/service/modules/environment'
import { listAlertGroupById } from '@/service/modules/alert-group'
import type { EnvironmentItem } from '@/service/modules/environment/types'
import type { IStartState } from '../types'
export const useStart = (
ctx: SetupContext<('update:show' | 'update:row' | 'updateList')[]>
) => {
const { t } = useI18n()
const route = useRoute()
const variables = reactive({
startFormRef: ref(),
startForm: {
version: 1,
warningType: 'NONE',
warningGroupId: null,
workerGroup: 'default',
tenantCode: 'default',
environmentCode: null,
startParams: null as null | string,
dryRun: 0
},
startState: {
projectCode: Number(route.params.projectCode),
workerGroups: [],
tenantList: [],
alertGroups: [],
environmentList: [],
startParamsList: []
} as IStartState,
saving: false
})
const cachedStartParams = {} as {
[key: string]: { prop: string; value: string }[]
}
const getWorkerGroups = () => {
queryAllWorkerGroups().then((res: any) => {
variables.startState.workerGroups = res.map((item: string) => ({
label: item,
value: item
}))
})
}
const getTenantList = () => {
queryTenantList().then((res: any) => {
variables.startState.tenantList = res.map((item: any) => ({
label: item.tenantCode,
value: item.tenantCode
}))
})
}
const getEnvironmentList = () => {
queryAllEnvironmentList().then((res: Array<EnvironmentItem>) => {
variables.startState.environmentList = res.map((item) => ({
label: item.name,
value: item.code,
workerGroups: item.workerGroups
}))
})
}
const getAlertGroups = () => {
listAlertGroupById().then((res: any) => {
variables.startState.alertGroups = res.map((item: any) => ({
label: item.groupName,
value: item.id
}))
})
}
const getStartParamsList = (code: number) => {
if (cachedStartParams[code]) {
variables.startState.startParamsList = _.cloneDeep(
cachedStartParams[code]
)
return
}
queryProcessDefinitionByCode(code, variables.startState.projectCode).then(
(res: any) => {
variables.startState.startParamsList =
res.processDefinition.globalParamList
cachedStartParams[code] = _.cloneDeep(
variables.startState.startParamsList
)
}
)
}
const handleStartDefinition = async (code: number) => {
await variables.startFormRef.validate()
if (variables.saving) return
variables.saving = true
try {
const startParams = {} as any
for (const item of variables.startState.startParamsList) {
if (item.value !== '') {
startParams[item.prop] = item.value
}
}
variables.startForm.startParams = !_.isEmpty(startParams)
? JSON.stringify(startParams)
: ''
await startTaskDefinition(variables.startState.projectCode, code, {
...variables.startForm
} as any)
window.$message.success(t('project.task.success'))
variables.saving = false
ctx.emit('updateList')
ctx.emit('update:show')
} catch (err) {
variables.saving = false
}
}
return {
variables,
getWorkerGroups,
getTenantList,
getEnvironmentList,
getAlertGroups,
getStartParamsList,
handleStartDefinition
}
}

213
dolphinscheduler-ui/src/views/projects/task/definition/components/use-version.ts

@ -1,213 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useI18n } from 'vue-i18n'
import { h, reactive, ref } from 'vue'
import { NButton, NPopconfirm, NSpace, NTag, NTooltip } from 'naive-ui'
import { DeleteOutlined, CheckOutlined } from '@vicons/antd'
import { useAsyncState } from '@vueuse/core'
import {
queryTaskVersions,
switchVersion,
deleteVersion
} from '@/service/modules/task-definition'
import { useRoute } from 'vue-router'
import type {
TaskDefinitionVersionRes,
TaskDefinitionVersionItem
} from '@/service/modules/task-definition/types'
export function useVersion() {
const { t } = useI18n()
const route = useRoute()
const projectCode = Number(route.params.projectCode)
const createColumns = (variables: any) => {
variables.columns = [
{
title: '#',
key: 'index',
render: (row: any, index: number) => index + 1
},
{
title: t('project.task.version'),
key: 'version',
render: (row: TaskDefinitionVersionItem) =>
h(
'span',
null,
row.version !== variables.taskVersion
? 'v' + row.version
: h(
NTag,
{ type: 'success', size: 'small' },
{
default: () =>
`v${row.version} ${t('project.task.current_version')}`
}
)
)
},
{
title: t('project.task.description'),
key: 'description',
render: (row: TaskDefinitionVersionItem) =>
h('span', null, row.description ? row.description : '-')
},
{
title: t('project.task.create_time'),
key: 'createTime'
},
{
title: t('project.task.operation'),
key: 'operation',
render(row: TaskDefinitionVersionItem) {
return h(NSpace, null, {
default: () => [
h(
NPopconfirm,
{
onPositiveClick: () => {
handleSwitchVersion(row)
}
},
{
trigger: () =>
h(
NTooltip,
{},
{
trigger: () =>
h(
NButton,
{
circle: true,
type: 'info',
size: 'small',
disabled: row.version === variables.taskVersion
},
{
icon: () => h(CheckOutlined)
}
),
default: () => t('project.task.switch_version')
}
),
default: () => t('project.task.confirm_switch_version')
}
),
h(
NPopconfirm,
{
onPositiveClick: () => {
handleDelete(row)
}
},
{
trigger: () =>
h(
NTooltip,
{},
{
trigger: () =>
h(
NButton,
{
circle: true,
type: 'error',
size: 'small',
disabled: row.version === variables.taskVersion
},
{
icon: () => h(DeleteOutlined)
}
),
default: () => t('project.task.delete')
}
),
default: () => t('project.task.delete_confirm')
}
)
]
})
}
}
]
}
const variables = reactive({
columns: [],
tableData: [],
page: ref(1),
pageSize: ref(10),
totalPage: ref(1),
taskVersion: ref(null),
taskCode: ref(null),
refreshTaskDefinition: ref(false),
row: {},
loadingRef: ref(false)
})
const handleSwitchVersion = (row: TaskDefinitionVersionItem) => {
switchVersion(
{ version: row.version },
{ code: variables.taskCode },
{ projectCode }
).then(() => {
variables.refreshTaskDefinition = true
})
}
const handleDelete = (row: TaskDefinitionVersionItem) => {
deleteVersion(
{ version: row.version },
{ code: variables.taskCode },
{ projectCode }
).then(() => {
variables.refreshTaskDefinition = true
})
}
const getTableData = (params: any) => {
if (variables.loadingRef) return
variables.loadingRef = true
const { state } = useAsyncState(
queryTaskVersions(
{ ...params },
{ code: variables.taskCode },
{ projectCode }
).then((res: TaskDefinitionVersionRes) => {
variables.tableData = res.totalList.map((item, unused) => {
return {
...item
}
}) as any
variables.totalPage = res.totalPage
variables.loadingRef = false
}),
{}
)
return state
}
return {
variables,
getTableData,
createColumns
}
}

109
dolphinscheduler-ui/src/views/projects/task/definition/components/version-modal.tsx

@ -1,109 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { defineComponent, toRefs, watch } from 'vue'
import { NDataTable, NPagination, NSpace } from 'naive-ui'
import { useI18n } from 'vue-i18n'
import { useVersion } from './use-version'
import Modal from '@/components/modal'
import type { PropType } from 'vue'
const props = {
show: {
type: Boolean as PropType<boolean>,
default: false
},
row: {
type: Object as PropType<any>,
default: {}
}
}
const VersionModal = defineComponent({
name: 'VersionModal',
props,
emits: ['confirm', 'refresh'],
setup(props, ctx) {
const { t } = useI18n()
const { variables, getTableData, createColumns } = useVersion()
const requestData = () => {
getTableData({
pageSize: variables.pageSize,
pageNo: variables.page
})
}
watch(
() => props.show,
() => {
if (props.show) {
variables.taskVersion = props.row?.taskVersion
variables.taskCode = props.row?.taskCode
createColumns(variables)
requestData()
}
}
)
watch(
() => variables.refreshTaskDefinition,
() => {
if (variables.refreshTaskDefinition) {
ctx.emit('refresh')
variables.refreshTaskDefinition = false
}
}
)
const onConfirm = () => {
ctx.emit('confirm')
}
return { t, ...toRefs(variables), requestData, onConfirm }
},
render() {
const { t, requestData, onConfirm, show, loadingRef } = this
return (
<Modal
show={show}
title={t('project.task.version')}
cancelShow={false}
onConfirm={onConfirm}
>
<NSpace vertical>
<NDataTable
loading={loadingRef}
columns={this.columns}
data={this.tableData}
/>
<NSpace justify='center'>
<NPagination
v-model:page={this.page}
v-model:page-size={this.pageSize}
page-count={this.totalPage}
onUpdatePage={requestData}
/>
</NSpace>
</NSpace>
</Modal>
)
}
})
export default VersionModal

41
dolphinscheduler-ui/src/views/projects/task/definition/index.tsx

@ -1,41 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { defineComponent } from 'vue'
import { useI18n } from 'vue-i18n'
import { NTabPane, NTabs } from 'naive-ui'
import BatchTaskDefinition from './batch-task'
import StreamTaskDefinition from './stream-task'
const TaskDefinition = defineComponent({
name: 'task-definition',
setup() {
const { t } = useI18n()
return () => (
<NTabs type='line' animated>
<NTabPane name='Batch' tab={t('project.task.batch_task')}>
<BatchTaskDefinition />
</NTabPane>
<NTabPane name='Stream' tab={t('project.task.stream_task')}>
<StreamTaskDefinition />
</NTabPane>
</NTabs>
)
}
})
export default TaskDefinition

152
dolphinscheduler-ui/src/views/projects/task/definition/stream-task.tsx

@ -1,152 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useI18n } from 'vue-i18n'
import { SearchOutlined } from '@vicons/antd'
import { defineComponent, getCurrentInstance, watch, onMounted } from 'vue'
import {
NInput,
NButton,
NIcon,
NSpace,
NDataTable,
NPagination
} from 'naive-ui'
import { useRoute } from 'vue-router'
import { useTable } from './use-stream-table'
import { useTask } from './use-task'
import StartModal from './components/start-modal'
import Card from '@/components/card'
import TaskModal from '@/views/projects/task/components/node/detail-modal'
import type { INodeData } from './types'
const StreamTaskDefinition = defineComponent({
name: 'stream-task-definition',
setup() {
const { t } = useI18n()
const route = useRoute()
const projectCode = Number(route.params.projectCode)
const trim = getCurrentInstance()?.appContext.config.globalProperties.trim
const { task, onToggleShow, onEditTask, onInitTask, onUpdateTask } =
useTask(projectCode)
const { variables, getTableData, createColumns } = useTable(onEditTask)
const onSearch = () => {
variables.page = 1
getTableData()
}
const onRefresh = () => {
getTableData()
}
const onUpdatePageSize = () => {
variables.page = 1
getTableData()
}
const onTaskCancel = () => {
onToggleShow(false)
onInitTask()
}
const onTaskSubmit = async (params: { data: INodeData }) => {
const result = await onUpdateTask(params.data)
if (result) {
onTaskCancel()
onRefresh()
}
}
onMounted(() => {
createColumns(variables)
getTableData()
})
watch(useI18n().locale, () => {
createColumns(variables)
})
return () => (
<NSpace vertical>
<Card>
<NSpace justify='end'>
<NInput
allowInput={trim}
size='small'
clearable
v-model={[variables.searchTaskName, 'value']}
placeholder={t('project.task.task_name')}
/>
<NInput
allowInput={trim}
size='small'
clearable
v-model={[variables.searchWorkflowName, 'value']}
placeholder={t('project.task.workflow_name')}
/>
<NButton size='small' type='primary' onClick={onSearch}>
<NIcon>
<SearchOutlined />
</NIcon>
</NButton>
</NSpace>
</Card>
<Card>
<NSpace vertical>
<NDataTable
loading={variables.loading}
columns={variables.columns}
data={variables.tableData}
scrollX={variables.tableWidth}
/>
<NSpace justify='center'>
<NPagination
v-model:page={variables.page}
v-model:page-size={variables.pageSize}
page-count={variables.totalPage}
show-size-picker
page-sizes={[10, 30, 50]}
show-quick-jumper
onUpdatePage={getTableData}
onUpdatePageSize={onUpdatePageSize}
/>
</NSpace>
</NSpace>
</Card>
<TaskModal
show={task.taskShow}
data={task.taskData}
onSubmit={onTaskSubmit}
onCancel={onTaskCancel}
projectCode={projectCode}
from={1}
readonly={task.taskReadonly}
saving={task.taskSaving}
/>
<StartModal
v-model:row={variables.row}
v-model:show={variables.startShow}
onUpdateList={getTableData}
/>
</NSpace>
)
}
})
export default StreamTaskDefinition

40
dolphinscheduler-ui/src/views/projects/task/definition/types.ts

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { IOption } from '@/components/form/types'
import { IParam } from '../../workflow/definition/components/types'
import { IEnvironmentNameOption } from '../components/node/types'
export type { ITaskData, INodeData } from '../components/node/types'
export type { ISingleSaveReq } from '@/service/modules/task-definition/types'
interface IRecord {
processDefinitionCode: number
taskCode: number
taskName: string
}
interface IStartState {
projectCode: number
workerGroups: Array<IOption>
tenantList: Array<IOption>
alertGroups: Array<IOption>
environmentList: Array<IEnvironmentNameOption>
startParamsList: Array<IParam>
}
export { IRecord, IStartState }

202
dolphinscheduler-ui/src/views/projects/task/definition/use-stream-table.ts

@ -1,202 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive, h } from 'vue'
import { NButton, NIcon, NSpace, NTooltip, NEllipsis } from 'naive-ui'
import ButtonLink from '@/components/button-link'
import { useI18n } from 'vue-i18n'
import { EditOutlined, PlayCircleOutlined } from '@vicons/antd'
import { queryTaskDefinitionListPaging } from '@/service/modules/task-definition'
import { useRoute } from 'vue-router'
import {
COLUMN_WIDTH_CONFIG,
calculateTableWidth,
DefaultTableWidth
} from '@/common/column-width-config'
import type {
TaskDefinitionItem,
TaskDefinitionRes
} from '@/service/modules/task-definition/types'
import type { IRecord } from './types'
export function useTable(onEdit: Function) {
const { t } = useI18n()
const route = useRoute()
const projectCode = Number(route.params.projectCode)
const createColumns = (variables: any) => {
variables.columns = [
{
title: '#',
key: 'index',
render: (row: any, index: number) => index + 1,
...COLUMN_WIDTH_CONFIG['index']
},
{
title: t('project.task.task_name'),
key: 'taskName',
...COLUMN_WIDTH_CONFIG['linkName'],
render: (row: IRecord) =>
h(
ButtonLink,
{
onClick: () => void onEdit(row, true)
},
{
default: () =>
h(
NEllipsis,
COLUMN_WIDTH_CONFIG['linkEllipsis'],
() => row.taskName
)
}
)
},
{
title: t('project.task.version'),
key: 'taskVersion',
render: (row: TaskDefinitionItem) =>
h('span', null, 'v' + row.taskVersion),
...COLUMN_WIDTH_CONFIG['version']
},
{
title: t('project.task.workflow_name'),
key: 'processDefinitionName',
...COLUMN_WIDTH_CONFIG['name']
},
{
title: t('project.task.task_type'),
key: 'taskType',
...COLUMN_WIDTH_CONFIG['type']
},
{
title: t('project.task.create_time'),
key: 'taskCreateTime',
...COLUMN_WIDTH_CONFIG['time']
},
{
title: t('project.task.update_time'),
key: 'taskUpdateTime',
...COLUMN_WIDTH_CONFIG['time']
},
{
title: t('project.task.operation'),
key: 'operation',
...COLUMN_WIDTH_CONFIG['operation'](2),
render(row: any) {
return h(NSpace, null, {
default: () => [
h(
NTooltip,
{},
{
trigger: () =>
h(
NButton,
{
circle: true,
type: 'info',
size: 'small',
onClick: () => onStart(row)
},
{
icon: () =>
h(NIcon, null, {
default: () => h(PlayCircleOutlined)
})
}
),
default: () => t('project.task.execute')
}
),
h(
NTooltip,
{},
{
trigger: () =>
h(
NButton,
{
circle: true,
type: 'info',
size: 'small',
onClick: () => onEdit(row, false)
},
{
icon: () =>
h(NIcon, null, { default: () => h(EditOutlined) })
}
),
default: () => t('project.task.edit')
}
)
]
})
}
}
]
if (variables.tableWidth) {
variables.tableWidth = calculateTableWidth(variables.columns)
}
}
const variables = reactive({
columns: [],
tableWidth: DefaultTableWidth,
tableData: [],
page: 1,
pageSize: 10,
searchTaskName: null,
searchWorkflowName: null,
totalPage: 1,
row: {},
loading: false,
startShow: false
})
const getTableData = () => {
if (variables.loading) return
variables.loading = true
const params = {
pageSize: variables.pageSize,
pageNo: variables.page,
searchTaskName: variables.searchTaskName,
searchWorkflowName: variables.searchWorkflowName,
taskExecuteType: 'STREAM' as 'BATCH' | 'STREAM'
} as any
queryTaskDefinitionListPaging(params, { projectCode })
.then((res: TaskDefinitionRes) => {
variables.tableData = [...res.totalList] as any
variables.totalPage = res.totalPage
})
.finally(() => {
variables.loading = false
})
}
const onStart = (row: any) => {
variables.row = row
variables.startShow = true
}
return {
variables,
getTableData,
createColumns
}
}

348
dolphinscheduler-ui/src/views/projects/task/definition/use-table.ts

@ -1,348 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useAsyncState } from '@vueuse/core'
import { reactive, h, ref } from 'vue'
import {
NButton,
NIcon,
NPopconfirm,
NSpace,
NTag,
NTooltip,
NEllipsis
} from 'naive-ui'
import ButtonLink from '@/components/button-link'
import { useI18n } from 'vue-i18n'
import {
DeleteOutlined,
EditOutlined,
ExclamationCircleOutlined
} from '@vicons/antd'
import {
queryTaskDefinitionListPaging,
deleteTaskDefinition
} from '@/service/modules/task-definition'
import { useRoute } from 'vue-router'
import {
COLUMN_WIDTH_CONFIG,
calculateTableWidth,
DefaultTableWidth
} from '@/common/column-width-config'
import type {
TaskDefinitionItem,
TaskDefinitionRes
} from '@/service/modules/task-definition/types'
import type { IRecord } from './types'
import { useDependencies } from '../../components/dependencies/use-dependencies'
export function useTable(onEdit: Function) {
const { t } = useI18n()
const route = useRoute()
const projectCode = Number(route.params.projectCode)
const { getDependentTaskLinksByTask } = useDependencies()
const createColumns = (variables: any) => {
variables.columns = [
{
title: '#',
key: 'index',
render: (row: any, index: number) => index + 1,
...COLUMN_WIDTH_CONFIG['index']
},
{
title: t('project.task.task_name'),
key: 'taskName',
...COLUMN_WIDTH_CONFIG['linkName'],
resizable: true,
minWidth: 200,
maxWidth: 600,
render: (row: IRecord) =>
h(
ButtonLink,
{
onClick: () => void onEdit(row, true)
},
{
default: () =>
h(
NEllipsis,
{
style: 'max-width: 580px;line-height: 1.5'
},
() => row.taskName
)
}
)
},
{
title: t('project.task.workflow_name'),
key: 'processDefinitionName',
...COLUMN_WIDTH_CONFIG['name'],
resizable: true,
minWidth: 200,
maxWidth: 600
},
{
title: t('project.task.workflow_state'),
key: 'processReleaseState',
render: (row: any) => {
if (row.processReleaseState === 'OFFLINE') {
return h(NTag, { type: 'error', size: 'small' }, () =>
t('project.task.offline')
)
} else if (row.processReleaseState === 'ONLINE') {
return h(NTag, { type: 'info', size: 'small' }, () =>
t('project.task.online')
)
}
},
width: 130
},
{
title: t('project.task.task_type'),
key: 'taskType',
...COLUMN_WIDTH_CONFIG['type']
},
{
title: t('project.task.version'),
key: 'taskVersion',
render: (row: TaskDefinitionItem) =>
h('span', null, 'v' + row.taskVersion),
...COLUMN_WIDTH_CONFIG['version']
},
{
title: t('project.task.upstream_tasks'),
key: 'upstreamTaskMap',
render: (row: TaskDefinitionItem) =>
row.upstreamTaskMap.map((item: string, index: number) => {
return h('p', null, { default: () => `[${index + 1}] ${item}` })
}),
...COLUMN_WIDTH_CONFIG['name']
},
{
title: t('project.task.create_time'),
key: 'taskCreateTime',
...COLUMN_WIDTH_CONFIG['time']
},
{
title: t('project.task.update_time'),
key: 'taskUpdateTime',
...COLUMN_WIDTH_CONFIG['time']
},
{
title: t('project.task.operation'),
key: 'operation',
...COLUMN_WIDTH_CONFIG['operation'](3),
render(row: any) {
return h(NSpace, null, {
default: () => [
h(
NTooltip,
{},
{
trigger: () =>
h(
NButton,
{
circle: true,
type: 'info',
size: 'small',
disabled:
['CONDITIONS', 'SWITCH'].includes(row.taskType) ||
(!!row.processDefinitionCode &&
row.processReleaseState === 'ONLINE'),
onClick: () => {
onEdit(row, false)
}
},
{
icon: () =>
h(NIcon, null, { default: () => h(EditOutlined) })
}
),
default: () => t('project.task.edit')
}
),
h(
NTooltip,
{},
{
trigger: () =>
h(
NButton,
{
circle: true,
type: 'info',
size: 'small',
onClick: () => {
variables.showVersionModalRef = true
variables.row = row
}
},
{
icon: () =>
h(NIcon, null, {
default: () => h(ExclamationCircleOutlined)
})
}
),
default: () => t('project.task.version')
}
),
h(
NPopconfirm,
{
onPositiveClick: () => {
handleDelete(row)
}
},
{
trigger: () =>
h(
NTooltip,
{},
{
trigger: () =>
h(
NButton,
{
circle: true,
type: 'error',
size: 'small',
disabled:
!!row.processDefinitionCode &&
row.processReleaseState === 'ONLINE'
},
{
icon: () =>
h(NIcon, null, {
default: () => h(DeleteOutlined)
})
}
),
default: () => t('project.task.delete')
}
),
default: () => t('project.task.delete_confirm')
}
)
]
})
}
}
]
if (variables.tableWidth) {
variables.tableWidth = calculateTableWidth(variables.columns)
}
}
const variables = reactive({
columns: [],
tableWidth: DefaultTableWidth,
tableData: [],
page: ref(1),
pageSize: ref(10),
searchTaskName: ref(null),
searchWorkflowName: ref(null),
totalPage: ref(1),
taskType: ref(null),
showVersionModalRef: ref(false),
dependentTasksShowRef: ref(false),
dependentTaskLinksRef: ref([]),
row: {},
loadingRef: ref(false),
dependenciesData: ref({
showRef: ref(false),
taskLinks: ref([]),
required: ref(false),
tip: ref(''),
action: () => {}
})
})
const handleDelete = (row: any) => {
variables.row = row
getDependentTaskLinksByTask(
projectCode,
row.processDefinitionCode,
row.taskCode
).then((res: any) => {
if (res && res.length > 0) {
variables.dependenciesData = {
showRef: true,
taskLinks: res,
tip: t('project.workflow.delete_validate_dependent_tasks_desc'),
required: true,
action: () => {}
}
} else {
deleteTaskDefinition({ code: row.taskCode }, { projectCode }).then(
() => {
getTableData({
pageSize: variables.pageSize,
pageNo:
variables.tableData.length === 1 && variables.page > 1
? variables.page - 1
: variables.page,
searchTaskName: variables.searchTaskName,
searchWorkflowName: variables.searchWorkflowName,
taskType: variables.taskType
})
}
)
}
})
}
const getTableData = (params: any) => {
if (variables.loadingRef) return
variables.loadingRef = true
const { state } = useAsyncState(
queryTaskDefinitionListPaging({ ...params }, { projectCode }).then(
(res: TaskDefinitionRes) => {
variables.tableData = res.totalList.map((item, unused) => {
if (Object.keys(item.upstreamTaskMap).length > 0) {
item.upstreamTaskMap = Object.keys(item.upstreamTaskMap).map(
(code) => item.upstreamTaskMap[code]
)
} else {
item.upstreamTaskMap = []
}
return {
...item
}
}) as any
variables.totalPage = res.totalPage
variables.loadingRef = false
}
),
{}
)
return state
}
return {
variables,
getTableData,
createColumns
}
}

140
dolphinscheduler-ui/src/views/projects/task/definition/use-task.ts

@ -1,140 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import {
updateTask,
genTaskCodeList,
saveSingle,
queryTaskDefinitionByCode,
updateWithUpstream
} from '@/service/modules/task-definition'
import { formatParams as formatData } from '../components/node/format-data'
import type { ITaskData, INodeData, ISingleSaveReq, IRecord } from './types'
import { Connect } from '../../workflow/components/dag/types'
export function useTask(projectCode: number) {
const initalTask = {
taskType: 'SHELL'
} as ITaskData
const task = reactive({
taskShow: false,
taskData: { ...initalTask },
taskSaving: false,
taskReadonly: false
} as { taskShow: boolean; taskData: ITaskData; taskSaving: boolean; taskReadonly: boolean })
const formatParams = (data: INodeData, isCreate: boolean): ISingleSaveReq => {
const params = formatData(data)
if (isCreate) {
return {
processDefinitionCode: params.processDefinitionCode,
upstreamCodes: params.upstreamCodes,
taskDefinitionJsonObj: JSON.stringify(params.taskDefinitionJsonObj)
}
}
return {
upstreamCodes: params.upstreamCodes,
taskDefinitionJsonObj: JSON.stringify(params.taskDefinitionJsonObj)
}
}
const getTaskCode = async () => {
const result = await genTaskCodeList(1, projectCode)
return result[0]
}
const onToggleShow = (show: boolean) => {
task.taskShow = show
}
const onTaskSave = async (data: INodeData) => {
if (task.taskSaving) return
task.taskSaving = true
try {
if (data.id) {
data.code &&
(await updateWithUpstream(
projectCode,
data.code,
formatParams({ ...data, code: data.code }, false)
))
} else {
const taskCode = await getTaskCode()
await saveSingle(
projectCode,
formatParams({ ...data, code: taskCode }, true)
)
}
task.taskSaving = false
return true
} catch (err) {
task.taskSaving = false
return false
}
}
const onEditTask = async (row: IRecord, readonly: boolean) => {
const result = await queryTaskDefinitionByCode(row.taskCode, projectCode)
task.taskData = {
...result,
processName: row.processDefinitionCode,
preTasks:
result?.processTaskRelationList?.map(
(item: Connect) => item.preTaskCode
) || []
}
task.taskShow = true
task.taskReadonly = readonly
}
const onInitTask = () => {
task.taskData = { ...initalTask }
task.taskReadonly = false
}
const onUpdateTask = async (data: INodeData) => {
if (task.taskSaving || !data.code) return
task.taskSaving = true
const params = {
taskExecuteType: 'STREAM',
taskDefinitionJsonObj: JSON.stringify(
formatData(data).taskDefinitionJsonObj
)
}
try {
await updateTask(projectCode, data.code, params)
task.taskSaving = false
return true
} catch (err) {
task.taskSaving = false
return false
}
}
return {
task,
onToggleShow,
onTaskSave,
onEditTask,
onInitTask,
onUpdateTask
}
}
Loading…
Cancel
Save