From 0e1c8d81530ad530779469598f03410ceb9b067b Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Thu, 13 Oct 2022 09:41:03 +0800 Subject: [PATCH] [Feature][API] New restful API task and task relation (#12116) --- .../ProcessTaskRelationV2Controller.java | 132 +++++ .../api/controller/ScheduleV2Controller.java | 2 +- .../controller/TaskDefinitionController.java | 5 +- .../TaskDefinitionV2Controller.java | 170 ++++++ .../api/controller/WorkflowV2Controller.java | 2 +- .../api/dto/task/TaskCreateRequest.java | 141 +++++ .../api/dto/task/TaskFilterRequest.java | 62 +++ .../api/dto/task/TaskUpdateRequest.java | 175 ++++++ .../TaskRelationCreateRequest.java | 58 ++ .../TaskRelationDeleteRequest.java | 51 ++ .../TaskRelationFilterRequest.java | 73 +++ .../TaskRelationUpdateUpstreamRequest.java | 54 ++ .../dto/workflow/WorkflowUpdateRequest.java | 12 + .../dolphinscheduler/api/enums/Status.java | 13 + .../service/ProcessTaskRelationService.java | 39 +- .../api/service/TaskDefinitionService.java | 64 ++- .../impl/ProcessTaskRelationServiceImpl.java | 245 ++++++++- .../impl/TaskDefinitionServiceImpl.java | 345 +++++++++--- .../ProcessTaskRelationServiceTest.java | 257 ++++++--- .../TaskDefinitionServiceImplTest.java | 516 ++++++++++++------ .../dao/entity/ProcessDefinitionLog.java | 1 - .../dao/entity/ProcessTaskRelation.java | 26 +- .../dao/entity/ProcessTaskRelationLog.java | 1 - .../dao/entity/TaskDefinitionLog.java | 1 - .../dao/mapper/ProcessTaskRelationMapper.java | 23 +- .../dao/mapper/TaskDefinitionMapper.java | 12 +- .../dao/mapper/ProcessTaskRelationMapper.xml | 23 + .../dao/mapper/TaskDefinitionMapper.xml | 20 + .../service/process/ProcessServiceImpl.java | 17 +- 29 files changed, 2188 insertions(+), 352 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionV2Controller.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskFilterRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationCreateRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationDeleteRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationFilterRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationUpdateUpstreamRequest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java new file mode 100644 index 0000000000..8326467581 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.controller; + +import static org.apache.dolphinscheduler.api.enums.Status.CREATE_PROCESS_TASK_RELATION_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_PROCESS_RELATION_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_UPSTREAM_TASK_PROCESS_RELATION_ERROR; + +import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationCreateRequest; +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationDeleteRequest; +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest; +import org.apache.dolphinscheduler.api.exceptions.ApiException; +import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.User; + +import springfox.documentation.annotations.ApiIgnore; + +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestAttribute; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; + +/** + * process task relation controller + */ +@Api(tags = "PROCESS_TASK_RELATION_TAG") +@RestController +@RequestMapping("v2/relations") +public class ProcessTaskRelationV2Controller extends BaseController { + + @Autowired + private ProcessTaskRelationService processTaskRelationService; + + /** + * create resource process task relation + * + * @param loginUser login user + * @param TaskRelationCreateRequest process task definition json contains the object you want to create + * @return Result object created + */ + @ApiOperation(value = "create", notes = "CREATE_PROCESS_TASK_RELATION_NOTES") + @PostMapping(consumes = {"application/json"}) + @ResponseStatus(HttpStatus.CREATED) + @ApiException(CREATE_PROCESS_TASK_RELATION_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result createTaskRelation(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestBody TaskRelationCreateRequest TaskRelationCreateRequest) { + ProcessTaskRelation processTaskRelation = + processTaskRelationService.createProcessTaskRelationV2(loginUser, TaskRelationCreateRequest); + return Result.success(processTaskRelation); + } + + /** + * delete resource process task relation + * + * @param loginUser login user + * @param codePair code pair you want to delete the task relation, use `upstream,downstream` as example, will delete exists relation upstream -> downstream, throw error if not exists + * @return delete result code + */ + @ApiOperation(value = "delete", notes = "DELETE_PROCESS_TASK_RELATION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "code-pair", value = "TASK_DEFINITION_CODE", dataTypeClass = long.class, example = "123456,78901", required = true) + }) + @DeleteMapping(value = "/{code-pair}") + @ResponseStatus(HttpStatus.OK) + @ApiException(DELETE_TASK_PROCESS_RELATION_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result deleteTaskRelation(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable("code-pair") String codePair) { + TaskRelationDeleteRequest taskRelationDeleteRequest = new TaskRelationDeleteRequest(codePair); + processTaskRelationService.deleteTaskProcessRelationV2(loginUser, taskRelationDeleteRequest.getUpstreamCode(), + taskRelationDeleteRequest.getDownstreamCode()); + return Result.success(); + } + + /** + * Update resource task relation by code, only update this code's upstreams + * + * @param loginUser login user + * @param code resource task code want to update its upstreams + * @param taskRelationUpdateUpstreamRequest workflowUpdateRequest + * @return ResourceResponse object updated + */ + @ApiOperation(value = "update", notes = "UPDATE_PROCESS_TASK_RELATION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "code", value = "DOWMSTREAM_TASK_DEFINITION_CODE", dataTypeClass = long.class, example = "123456", required = true) + }) + @PutMapping(value = "/{code}") + @ResponseStatus(HttpStatus.OK) + @ApiException(UPDATE_UPSTREAM_TASK_PROCESS_RELATION_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result> updateUpstreamTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable("code") Long code, + @RequestBody TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) { + List processTaskRelations = processTaskRelationService + .updateUpstreamTaskDefinition(loginUser, code, taskRelationUpdateUpstreamRequest); + return Result.success(processTaskRelations); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ScheduleV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ScheduleV2Controller.java index ba26cfa227..75230a1379 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ScheduleV2Controller.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ScheduleV2Controller.java @@ -150,7 +150,7 @@ public class ScheduleV2Controller extends BaseController { * @return result Result */ @ApiOperation(value = "get", notes = "QUERY_SCHEDULE_LIST_PAGING_NOTES") - @GetMapping(consumes = {"application/json"}) + @PostMapping(value = "/filter", consumes = {"application/json"}) @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_SCHEDULE_LIST_PAGING_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java index eda294756c..b6060d78c4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java @@ -29,6 +29,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.SWITCH_TASK_DEFINITIO import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_DEFINITION_ERROR; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; +import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.Result; @@ -294,8 +295,8 @@ public class TaskDefinitionController extends BaseController { public Result deleteTaskDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @PathVariable(value = "code") long code) { - Map result = taskDefinitionService.deleteTaskDefinitionByCode(loginUser, projectCode, code); - return returnDataList(result); + taskDefinitionService.deleteTaskDefinitionByCode(loginUser, code); + return new Result(Status.SUCCESS); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionV2Controller.java new file mode 100644 index 0000000000..dbd7ec15fe --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionV2Controller.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.controller; + +import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.DELETE_SCHEDULE_BY_ID_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_TASK_DEFINITION_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST; +import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_DEFINITION_ERROR; + +import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; +import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; +import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest; +import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest; +import org.apache.dolphinscheduler.api.exceptions.ApiException; +import org.apache.dolphinscheduler.api.service.TaskDefinitionService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.User; + +import springfox.documentation.annotations.ApiIgnore; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestAttribute; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; + +/** + * task definition controller + */ +@Api(tags = "TASK_DEFINITION_TAG") +@RestController +@RequestMapping("v2/tasks") +public class TaskDefinitionV2Controller extends BaseController { + + @Autowired + private TaskDefinitionService taskDefinitionService; + + /** + * Create resource task definition + * + * @param loginUser login user + * @param taskCreateRequest task definition json + * @return Result object created + */ + @ApiOperation(value = "create", notes = "CREATE_TASK_DEFINITION_NOTES") + @PostMapping(consumes = {"application/json"}) + @ResponseStatus(HttpStatus.CREATED) + @ApiException(CREATE_TASK_DEFINITION_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result createTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestBody TaskCreateRequest taskCreateRequest) { + TaskDefinition taskDefinition = taskDefinitionService.createTaskDefinitionV2(loginUser, taskCreateRequest); + return Result.success(taskDefinition); + } + + /** + * Delete resource task definition by code + * + * @param loginUser login user + * @param code task definition code + */ + @ApiOperation(value = "delete", notes = "DELETE_TASK_DEFINITION_VERSION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", dataTypeClass = long.class, example = "123456", required = true) + }) + @DeleteMapping(value = "/{code}") + @ResponseStatus(HttpStatus.OK) + @ApiException(DELETE_SCHEDULE_BY_ID_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result deleteTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable("code") Long code) { + taskDefinitionService.deleteTaskDefinitionByCode(loginUser, code); + return Result.success(); + } + + /** + * Update resource task definition by code + * + * @param loginUser login user + * @param code task code of resource you want to update + * @param taskUpdateRequest workflowUpdateRequest + * @return ResourceResponse object updated + */ + @ApiOperation(value = "update", notes = "UPDATE_TASK_DEFINITION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", dataTypeClass = long.class, example = "123456", required = true) + }) + @PutMapping(value = "/{code}") + @ResponseStatus(HttpStatus.OK) + @ApiException(UPDATE_TASK_DEFINITION_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result updateTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable("code") Long code, + @RequestBody TaskUpdateRequest taskUpdateRequest) { + TaskDefinition taskDefinition = + taskDefinitionService.updateTaskDefinitionV2(loginUser, code, taskUpdateRequest); + return Result.success(taskDefinition); + } + + /** + * Get resource task definition by code + * + * @param loginUser login user + * @param code task code of resource you want to update + * @return ResourceResponse object get from condition + */ + @ApiOperation(value = "get", notes = "GET_TASK_DEFINITION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", dataTypeClass = long.class, example = "123456", required = true) + }) + @GetMapping(value = "/{code}") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_DETAIL_OF_TASK_DEFINITION_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result getTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable("code") Long code) { + TaskDefinition taskDefinition = taskDefinitionService.getTaskDefinition(loginUser, code); + return Result.success(taskDefinition); + } + + /** + * Get resource task definition according to query parameter + * + * @param loginUser login user + * @param taskFilterRequest workflowFilterRequest + * @return PageResourceResponse from condition + */ + @ApiOperation(value = "get", notes = "FILTER_TASK_DEFINITION_NOTES") + @PostMapping(value = "/query", consumes = {"application/json"}) + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_PROCESS_DEFINITION_LIST) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result> filterTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestBody TaskFilterRequest taskFilterRequest) { + PageInfo taskDefinitions = + taskDefinitionService.filterTaskDefinition(loginUser, taskFilterRequest); + return Result.success(taskDefinitions); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowV2Controller.java index 1270f01a68..c75ec2d379 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowV2Controller.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowV2Controller.java @@ -152,7 +152,7 @@ public class WorkflowV2Controller extends BaseController { * @return PageResourceResponse from condition */ @ApiOperation(value = "get", notes = "FILTER_WORKFLOWS_NOTES") - @GetMapping(consumes = {"application/json"}) + @PostMapping(value = "/query", consumes = {"application/json"}) @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_PROCESS_DEFINITION_LIST) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java new file mode 100644 index 0000000000..df8a06d378 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.task; + +import static org.apache.dolphinscheduler.common.Constants.VERSION_FIRST; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; + +import java.util.Date; + +import lombok.Data; +import io.swagger.annotations.ApiModelProperty; + +/** + * task create request + */ +@Data +public class TaskCreateRequest { + + @ApiModelProperty(example = "workflow-code", required = true) + private long workflowCode; + @ApiModelProperty(example = "task-name", required = true) + private String name; + @ApiModelProperty(example = "describe what this task actual do", required = true) + private String description; + + @ApiModelProperty(example = "6816095515584", dataType = "Long") + private long projectCode; + + @ApiModelProperty(example = "SHELL", required = true) + private String taskType; + + // todo + @ApiModelProperty(example = "{\"localParams\": [], \"rawScript\": \"echo 1\", \"resourceList\": []}", required = true, notes = "task definition params") + private String taskParams; + + @ApiModelProperty(example = "YES", allowableValues = "YES,NO", notes = "default YES is not provided") + private String flag; + + @ApiModelProperty(example = "MEDIUM", allowableValues = "HIGHEST,HIGH,MEDIUM,LOW,LOWEST", notes = "default MEDIUM is not provided") + private String taskPriority; + + @ApiModelProperty(example = "default", notes = "default 'default' if not provided") + private String workerGroup; + + @ApiModelProperty(example = "6563415109312", dataType = "Long") + private long environmentCode; + + @ApiModelProperty(example = "0", dataType = "Integer", notes = "default 0 not provided") + private int failRetryTimes; + + @ApiModelProperty(example = "1") + private int failRetryInterval; + + @ApiModelProperty(example = "SHELL") + private int timeout; + + @ApiModelProperty(example = "CLOSE", allowableValues = "CLOSE,OPEN", notes = "default CLOSE is not provided") + private String timeoutFlag; + + @ApiModelProperty(example = "MEDIUM", allowableValues = "WARN,FAILED,WARNFAILED", notes = "default MEDIUM is not provided") + private String timeoutNotifyStrategy; + + @ApiModelProperty(example = "1,2,3") + private String resourceIds; + + @ApiModelProperty(example = "2") + private int taskGroupId; + + @ApiModelProperty(example = "1", dataType = "Integer", notes = "A priority number for execute task, the bigger the high priority, default null if not provided") + private int taskGroupPriority; + + @ApiModelProperty(example = "0.1", dataType = "Float", notes = "default unlimited if not provided") + private Integer cpuQuota; + + @ApiModelProperty(example = "0.1", dataType = "Float", notes = "default unlimited if not provided") + private Integer memoryMax; + + @ApiModelProperty(example = "upstream-task-codes1,upstream-task-codes2", notes = "use , to split multiple upstream task codes") + private String upstreamTasksCodes; + + public TaskDefinition convert2TaskDefinition() { + TaskDefinition taskDefinition = new TaskDefinition(); + + taskDefinition.setName(this.name); + taskDefinition.setDescription(this.description); + taskDefinition.setProjectCode(this.projectCode); + taskDefinition.setTaskType(this.taskType); + taskDefinition.setTaskParams(this.taskParams); + taskDefinition.setWorkerGroup(this.workerGroup == null ? Constants.DEFAULT_WORKER_GROUP : this.workerGroup); + taskDefinition.setEnvironmentCode(this.environmentCode); + taskDefinition.setFailRetryTimes(this.failRetryTimes); + taskDefinition.setFailRetryInterval(this.failRetryInterval); + taskDefinition.setTimeout(this.timeout); + taskDefinition.setResourceIds(this.resourceIds); + taskDefinition.setTaskGroupId(this.taskGroupId); + taskDefinition.setTaskGroupPriority(this.taskGroupPriority); + taskDefinition.setCpuQuota(this.cpuQuota); + taskDefinition.setMemoryMax(this.memoryMax); + + Flag flagCreate = this.flag == null ? Flag.YES : Flag.valueOf(this.flag); + taskDefinition.setFlag(flagCreate); + + TimeoutFlag timeoutFlagCreate = + this.timeoutFlag == null ? TimeoutFlag.CLOSE : TimeoutFlag.valueOf(this.timeoutFlag); + taskDefinition.setTimeoutFlag(timeoutFlagCreate); + + Priority taskPriorityCreate = this.taskPriority == null ? Priority.MEDIUM : Priority.valueOf(this.taskPriority); + taskDefinition.setTaskPriority(taskPriorityCreate); + + TaskTimeoutStrategy taskTimeoutStrategyCreate = this.timeoutNotifyStrategy == null ? TaskTimeoutStrategy.WARN + : TaskTimeoutStrategy.valueOf(this.timeoutNotifyStrategy); + taskDefinition.setTimeoutNotifyStrategy(taskTimeoutStrategyCreate); + + taskDefinition.setVersion(VERSION_FIRST); + Date date = new Date(); + taskDefinition.setCreateTime(date); + taskDefinition.setUpdateTime(date); + return taskDefinition; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskFilterRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskFilterRequest.java new file mode 100644 index 0000000000..14d29965f6 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskFilterRequest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.task; + +import org.apache.dolphinscheduler.api.dto.PageQueryDto; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; + +import lombok.Data; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +/** + * task query request + */ +@ApiModel("TASK-QUERY") +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +public class TaskFilterRequest extends PageQueryDto { + + @ApiModelProperty(example = "project-name") + private String projectName; + + @ApiModelProperty(example = "task-name") + private String name; + + @ApiModelProperty(example = "SHELL") + private String taskType; + + public TaskDefinition convert2TaskDefinition() { + TaskDefinition taskDefinition = new TaskDefinition(); + if (this.projectName != null) { + taskDefinition.setProjectName(this.projectName); + } + if (this.name != null) { + taskDefinition.setName(this.name); + } + if (this.taskType != null) { + taskDefinition.setTaskType(this.taskType); + } + return taskDefinition; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java new file mode 100644 index 0000000000..d0725b29cf --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.task; + +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; + +import org.apache.commons.beanutils.BeanUtils; + +import java.lang.reflect.InvocationTargetException; +import java.util.Date; + +import lombok.Data; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import io.swagger.annotations.ApiModelProperty; + +/** + * task update request + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +public class TaskUpdateRequest { + + @ApiModelProperty(example = "workflow-code", required = true) + private long workflowCode; + @ApiModelProperty(example = "task-name") + private String name; + @ApiModelProperty(example = "describe what this task actual do") + private String description; + + @ApiModelProperty(example = "SHELL") + private String taskType; + + // todo + @ApiModelProperty(example = "{\"localParams\": [], \"rawScript\": \"echo 1\", \"resourceList\": []}", notes = "task definition params") + private String taskParams; + + @ApiModelProperty(example = "YES", allowableValues = "YES,NO", notes = "default YES is not provided") + private String flag; + + @ApiModelProperty(example = "MEDIUM", allowableValues = "HIGHEST,HIGH,MEDIUM,LOW,LOWEST", notes = "default MEDIUM is not provided") + private String taskPriority; + + @ApiModelProperty(example = "default", notes = "default 'default' if not provided") + private String workerGroup; + + @ApiModelProperty(example = "6563415109312", dataType = "Long") + private long environmentCode; + + @ApiModelProperty(example = "0", dataType = "Integer", notes = "default 0 not provided") + private int failRetryTimes; + + @ApiModelProperty(example = "1") + private int failRetryInterval; + + @ApiModelProperty(example = "SHELL") + private int timeout; + + @ApiModelProperty(example = "CLOSE", allowableValues = "CLOSE,OPEN", notes = "default CLOSE is not provided") + private String timeoutFlag; + + @ApiModelProperty(example = "MEDIUM", allowableValues = "WARN,FAILED,WARNFAILED", notes = "default MEDIUM is not provided") + private String timeoutNotifyStrategy; + + @ApiModelProperty(example = "1,2,3") + private String resourceIds; + + @ApiModelProperty(example = "2") + private int taskGroupId; + + @ApiModelProperty(example = "1", dataType = "Integer", notes = "A priority number for execute task, the bigger the high priority, default null if not provided") + private int taskGroupPriority; + + @ApiModelProperty(example = "0.1", dataType = "Float", notes = "default unlimited if not provided") + private Integer cpuQuota; + + @ApiModelProperty(example = "0.1", dataType = "Float", notes = "default unlimited if not provided") + private Integer memoryMax; + + @ApiModelProperty(example = "upstream-task-codes1,upstream-task-codes2", notes = "use , to split multiple upstream task codes") + private String upstreamTasksCodes; + + /** + * Merge taskUpdateRequest information into exists task definition object + * + * @param taskDefinition exists task definition object + * @return task definition + */ + public TaskDefinition mergeIntoTaskDefinition(TaskDefinition taskDefinition) throws InvocationTargetException, IllegalAccessException, InstantiationException, NoSuchMethodException { + TaskDefinition taskDefinitionDeepCopy = (TaskDefinition) BeanUtils.cloneBean(taskDefinition); + assert taskDefinitionDeepCopy != null; + if (this.name != null) { + taskDefinitionDeepCopy.setName(this.name); + } + if (this.description != null) { + taskDefinitionDeepCopy.setDescription(this.description); + } + if (this.taskType != null) { + taskDefinitionDeepCopy.setTaskType(this.taskType); + } + if (this.taskParams != null) { + taskDefinitionDeepCopy.setTaskParams(this.taskParams); + } + if (this.flag != null) { + taskDefinitionDeepCopy.setFlag(Flag.valueOf(this.flag)); + } + if (this.taskPriority != null) { + taskDefinitionDeepCopy.setTaskPriority(Priority.valueOf(this.taskPriority)); + } + if (this.workerGroup != null) { + taskDefinitionDeepCopy.setWorkerGroup(this.workerGroup); + } + if (this.environmentCode != 0L) { + taskDefinitionDeepCopy.setEnvironmentCode(this.environmentCode); + } + if (this.failRetryTimes != 0) { + taskDefinitionDeepCopy.setFailRetryTimes(this.failRetryTimes); + } + if (this.failRetryInterval != 0) { + taskDefinitionDeepCopy.setFailRetryInterval(this.failRetryInterval); + } + if (this.timeout != 0) { + taskDefinitionDeepCopy.setTimeout(this.timeout); + } + if (this.timeoutFlag != null) { + taskDefinitionDeepCopy.setTimeoutFlag(TimeoutFlag.valueOf(this.timeoutFlag)); + } + if (this.timeoutNotifyStrategy != null) { + taskDefinitionDeepCopy.setTimeoutNotifyStrategy(TaskTimeoutStrategy.valueOf(this.timeoutNotifyStrategy)); + } + if (this.resourceIds != null) { + taskDefinitionDeepCopy.setResourceIds(this.resourceIds); + } + if (this.taskGroupId != 0) { + taskDefinitionDeepCopy.setTaskGroupId(this.taskGroupId); + } + if (this.taskGroupPriority != 0) { + taskDefinitionDeepCopy.setTaskGroupPriority(this.taskGroupPriority); + } + if (this.cpuQuota != null) { + taskDefinitionDeepCopy.setCpuQuota(this.cpuQuota); + } + if (this.memoryMax != null) { + taskDefinitionDeepCopy.setMemoryMax(this.memoryMax); + } + + if (!taskDefinition.equals(taskDefinitionDeepCopy)) { + int version = taskDefinitionDeepCopy.getVersion() + 1; + taskDefinitionDeepCopy.setVersion(version); + taskDefinitionDeepCopy.setUpdateTime(new Date()); + } + return taskDefinitionDeepCopy; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationCreateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationCreateRequest.java new file mode 100644 index 0000000000..05e6b7b4f0 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationCreateRequest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.taskRelation; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; + +import java.util.Date; + +import lombok.Data; +import io.swagger.annotations.ApiModelProperty; + +/** + * task relation create request + */ +@Data +public class TaskRelationCreateRequest { + + @ApiModelProperty(example = "12345678") + private long projectCode; + + @ApiModelProperty(example = "87654321", required = true) + private long workflowCode; + + @ApiModelProperty(example = "12345", required = true) + private long preTaskCode; + + @ApiModelProperty(example = "54321", required = true) + private long postTaskCode; + + public ProcessTaskRelation convert2ProcessTaskRelation() { + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + + processTaskRelation.setProjectCode(this.projectCode); + processTaskRelation.setProcessDefinitionCode(this.workflowCode); + processTaskRelation.setPreTaskCode(this.preTaskCode); + processTaskRelation.setPostTaskCode(this.postTaskCode); + + Date date = new Date(); + processTaskRelation.setCreateTime(date); + processTaskRelation.setUpdateTime(date); + return processTaskRelation; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationDeleteRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationDeleteRequest.java new file mode 100644 index 0000000000..b6dde3d078 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationDeleteRequest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.taskRelation; + +import static org.apache.dolphinscheduler.common.Constants.COMMA; + +import java.util.stream.Stream; + +import lombok.Data; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import io.swagger.annotations.ApiModelProperty; + +/** + * task relation want to delete request + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +public class TaskRelationDeleteRequest { + + private long upstreamCode; + private long downstreamCode; + @ApiModelProperty(example = "12345678,87654321", required = true, notes = "relation pair want to delete relation, separated by comma") + private String codePair; + + public TaskRelationDeleteRequest(String relationPair) { + long[] relations = Stream.of(relationPair.split(COMMA)) + .map(String::trim) + .mapToLong(Long::parseLong) + .toArray(); + this.upstreamCode = relations[0]; + this.downstreamCode = relations[1]; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationFilterRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationFilterRequest.java new file mode 100644 index 0000000000..39e32dcad7 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationFilterRequest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.taskRelation; + +import org.apache.dolphinscheduler.api.dto.PageQueryDto; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; + +import lombok.Data; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +/** + * task relation query request + */ +@ApiModel("TASK-RELATION-QUERY") +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +public class TaskRelationFilterRequest extends PageQueryDto { + + @ApiModelProperty(example = "1234567890123") + private long workflowCode; + + @ApiModelProperty(example = "1234567890123") + private long preTaskCode; + + @ApiModelProperty(example = "1234567890123") + private long postTaskCode; + + public TaskRelationFilterRequest(long workflowCode, long preTaskCode, long postTaskCode) { + this.workflowCode = workflowCode; + this.preTaskCode = preTaskCode; + this.postTaskCode = postTaskCode; + } + + public TaskRelationFilterRequest(long preTaskCode, long postTaskCode) { + this.preTaskCode = preTaskCode; + this.postTaskCode = postTaskCode; + } + + public ProcessTaskRelation convert2TaskDefinition() { + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + if (this.workflowCode != 0L) { + processTaskRelation.setProcessDefinitionCode(this.workflowCode); + } + if (this.preTaskCode != 0L) { + processTaskRelation.setPreTaskCode(this.preTaskCode); + } + if (this.postTaskCode != 0L) { + processTaskRelation.setPostTaskCode(this.postTaskCode); + } + return processTaskRelation; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationUpdateUpstreamRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationUpdateUpstreamRequest.java new file mode 100644 index 0000000000..49f85cd697 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationUpdateUpstreamRequest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.taskRelation; + +import static org.apache.dolphinscheduler.common.Constants.COMMA; + +import org.apache.dolphinscheduler.api.dto.PageQueryDto; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.Data; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import io.swagger.annotations.ApiModelProperty; + +/** + * task relation update request + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +public class TaskRelationUpdateUpstreamRequest extends PageQueryDto { + + @ApiModelProperty(example = "1234587654321", notes = "workflow code ") + private long workflowCode; + + @ApiModelProperty(example = "12345678,87654321", required = true, notes = "upstream you want to update separated by comma") + private String upstreams; + + public List getUpstreams() { + return Stream.of(this.upstreams.split(COMMA)) + .map(String::trim) + .map(Long::parseLong) + .collect(Collectors.toList()); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java index 44aa86df67..103bf5612e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java @@ -62,6 +62,15 @@ public class WorkflowUpdateRequest { @ApiModelProperty(allowableValues = "PARALLEL / SERIAL_WAIT / SERIAL_DISCARD / SERIAL_PRIORITY", example = "PARALLEL", notes = "default PARALLEL if not provide.") private String executionType; + @ApiModelProperty(example = "[{\\\"taskCode\\\":7009653961024,\\\"x\\\":312,\\\"y\\\":196}]") + private String location; + + /** + * Merge workflowUpdateRequest information into exists processDefinition object + * + * @param processDefinition exists processDefinition object + * @return process definition + */ public ProcessDefinition mergeIntoProcessDefinition(ProcessDefinition processDefinition) { ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); @@ -90,6 +99,9 @@ public class WorkflowUpdateRequest { if (this.executionType != null) { processDefinitionDeepCopy.setExecutionType(ProcessExecutionTypeEnum.valueOf(this.executionType)); } + if (this.location != null) { + processDefinitionDeepCopy.setLocations(this.location); + } int version = processDefinitionDeepCopy.getVersion() + 1; processDefinitionDeepCopy.setVersion(version); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 0581c32668..147a62c76e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -357,6 +357,19 @@ public enum Status { BATCH_EXECUTE_PROCESS_INSTANCE_ERROR(50058, "change process instance status error: {0}", "修改工作实例状态错误: {0}"), START_TASK_INSTANCE_ERROR(50059, "start task instance error", "运行任务流实例错误"), DELETE_PROCESS_DEFINE_ERROR(50060, "delete process definition [{0}] error: {1}", "删除工作流定义[{0}]错误: {1}"), + CREATE_TASK_DEFINITION_LOG_ERROR(50061, "create task definition log {0} error", "创建任务操作记录 {0} 错误"), + DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR(50062, "delete task definition {0} error", "删除任务定义 {0} 错误"), + TASK_DEFINITION_NOT_CHANGE(50063, "task definition {0} do not change", "任务定义 {0} 没有变化"), + TASK_DEFINITION_NOT_EXISTS(50064, "task definition {0} do not exists", "任务定义 {0} 不存在"), + UPDATE_UPSTREAM_TASK_PROCESS_RELATION_ERROR(50065, "update task upstream relation error", "更新任务上游关系错误"), + CREATE_PROCESS_TASK_RELATION_LOG_ERROR(50066, "create process task relation log {0}-{1} error", + "创建任务关系日志 {0}-{1} 错误"), + PROCESS_TASK_RELATION_NOT_EXPECT(50067, "process task relation number not expect, expect {0} but get {1}", + "工作流任务关系数量不符合预期,预期 {0} 但是实际 {1}"), + PROCESS_TASK_RELATION_BATCH_DELETE_ERROR(50068, "batch delete process task relation {0} error", + "批量删除工作流任务关系 {0} 错误"), + PROCESS_TASK_RELATION_BATCH_CREATE_ERROR(50069, "batch create process task relation {0} error", + "批量创建工作流任务关系 {0} 错误"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java index f31cc8be48..7a31d49916 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java @@ -17,8 +17,12 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationCreateRequest; +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.User; +import java.util.List; import java.util.Map; /** @@ -42,6 +46,16 @@ public interface ProcessTaskRelationService { long preTaskCode, long postTaskCode); + /** + * create resource process task relation + * + * @param loginUser login user + * @param taskRelationCreateRequest project code + * @return ProcessTaskRelation object + */ + ProcessTaskRelation createProcessTaskRelationV2(User loginUser, + TaskRelationCreateRequest taskRelationCreateRequest); + /** * delete process task relation * @@ -56,6 +70,28 @@ public interface ProcessTaskRelationService { long processDefinitionCode, long taskCode); + /** + * delete process task relation, will delete exists relation preTaskCode -> postTaskCode, throw error if not exists + * + * @param loginUser login user + * @param preTaskCode relation upstream code + * @param postTaskCode relation downstream code + */ + void deleteTaskProcessRelationV2(User loginUser, + long preTaskCode, + long postTaskCode); + + /** + * delete process task relation, will delete exists relation upstream -> downstream, throw error if not exists + * + * @param loginUser login user + * @param taskCode relation upstream code + * @param taskRelationUpdateUpstreamRequest relation downstream code + */ + List updateUpstreamTaskDefinition(User loginUser, + long taskCode, + TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest); + /** * delete task upstream relation * @@ -118,5 +154,6 @@ public interface ProcessTaskRelationService { * @param postTaskCode post task code * @return delete result code */ - Map deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode); + Map deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, + long postTaskCode); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java index 0a84ec7553..40923db839 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java @@ -17,9 +17,14 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; +import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest; +import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest; +import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.TaskExecuteType; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.User; import java.util.Map; @@ -40,6 +45,16 @@ public interface TaskDefinitionService { long projectCode, String taskDefinitionJson); + /** + * Create resource task definition + * + * @param loginUser login user + * @param taskCreateRequest task definition json + * @Return new TaskDefinition have created + */ + TaskDefinition createTaskDefinitionV2(User loginUser, + TaskCreateRequest taskCreateRequest); + /** * create single task definition that binds the workflow * @@ -70,15 +85,16 @@ public interface TaskDefinitionService { String taskName); /** - * delete task definition + * Delete resource task definition by code + * + * Only task release state offline and no downstream tasks can be deleted, will also remove the exists + * task relation [upstreamTaskCode, taskCode] * * @param loginUser login user - * @param projectCode project code * @param taskCode task code */ - Map deleteTaskDefinitionByCode(User loginUser, - long projectCode, - long taskCode); + void deleteTaskDefinitionByCode(User loginUser, + long taskCode); /** * update task definition @@ -93,6 +109,38 @@ public interface TaskDefinitionService { long taskCode, String taskDefinitionJsonObj); + /** + * Update resource task definition by code + * + * @param loginUser login user + * @param taskCode task code + * @param taskUpdateRequest task definition json object + * @return new TaskDefinition have updated + */ + TaskDefinition updateTaskDefinitionV2(User loginUser, + long taskCode, + TaskUpdateRequest taskUpdateRequest); + + /** + * Get resource task definition by code + * + * @param loginUser login user + * @param taskCode task code + * @return TaskDefinition + */ + TaskDefinition getTaskDefinition(User loginUser, + long taskCode); + + /** + * Get resource task definition according to query parameter + * + * @param loginUser login user + * @param taskFilterRequest taskFilterRequest + * @return PageResourceResponse from condition + */ + PageInfo filterTaskDefinition(User loginUser, + TaskFilterRequest taskFilterRequest); + /** * update task definition and upstream * @@ -204,7 +252,7 @@ public interface TaskDefinitionService { * @return update result code */ Map releaseTaskDefinition(User loginUser, - long projectCode, - long code, - ReleaseState releaseState); + long projectCode, + long code, + ReleaseState releaseState); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index 79bcc951e3..859483cac6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -21,6 +21,9 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationCreateRequest; +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationFilterRequest; +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; @@ -35,6 +38,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; @@ -59,6 +63,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.google.common.collect.Lists; /** @@ -90,6 +96,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P @Autowired private ProcessService processService; + @Autowired + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + /** * create process task relation * @@ -165,6 +174,85 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } + private ProcessTaskRelationLog persist2ProcessTaskRelationLog(User user, ProcessTaskRelation processTaskRelation) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); + processTaskRelationLog.setOperator(user.getId()); + processTaskRelationLog.setOperateTime(new Date()); + int result = processTaskRelationLogMapper.insert(processTaskRelationLog); + if (result <= 0) { + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR, + processTaskRelationLog.getPreTaskCode(), processTaskRelationLog.getPostTaskCode()); + } + return processTaskRelationLog; + } + + private List batchPersist2ProcessTaskRelationLog(User user, + List processTaskRelations) { + Date now = new Date(); + List processTaskRelationLogs = new ArrayList<>(); + + for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); + processTaskRelationLog.setOperator(user.getId()); + processTaskRelationLog.setOperateTime(now); + processTaskRelationLogs.add(processTaskRelationLog); + } + int result = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs); + if (result != processTaskRelationLogs.size()) { + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR); + } + return processTaskRelationLogs; + } + + private void updateVersions(ProcessTaskRelation processTaskRelation) { + // workflow + ProcessDefinition processDefinition = + processDefinitionMapper.queryByCode(processTaskRelation.getProcessDefinitionCode()); + processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); + + // tasks + TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(processTaskRelation.getPreTaskCode()); + processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion()); + TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(processTaskRelation.getPostTaskCode()); + processTaskRelation.setPostTaskVersion(postTaskDefinition.getVersion()); + } + + /** + * create resource process task relation + * + * @param loginUser login user + * @param taskRelationCreateRequest project code + * @return ProcessTaskRelation object + */ + @Override + @Transactional + public ProcessTaskRelation createProcessTaskRelationV2(User loginUser, + TaskRelationCreateRequest taskRelationCreateRequest) { + ProcessTaskRelation processTaskRelation = taskRelationCreateRequest.convert2ProcessTaskRelation(); + ProcessDefinition processDefinition = + processDefinitionMapper.queryByCode(processTaskRelation.getProcessDefinitionCode()); + if (processDefinition == null) { + throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, + String.valueOf(processTaskRelation.getProcessDefinitionCode())); + } + if (processTaskRelation.getProjectCode() == 0) { + processTaskRelation.setProjectCode(processDefinition.getProjectCode()); + } + Project project = projectMapper.queryByCode(processTaskRelation.getProjectCode()); + projectService.checkProjectAndAuthThrowException(loginUser, project, null); + + // persistence process task relation and process task relation log to database + this.updateVersions(processTaskRelation); + int insert = processTaskRelationMapper.insert(processTaskRelation); + if (insert <= 0) { + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR, processTaskRelation.getPreTaskCode(), + processTaskRelation.getPostTaskCode()); + } + this.persist2ProcessTaskRelationLog(loginUser, processTaskRelation); + + return processTaskRelation; + } + private ProcessTaskRelation setRelation(ProcessDefinition processDefinition, TaskDefinition taskDefinition) { Date now = new Date(); ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); @@ -184,11 +272,13 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P ProcessDefinition processDefinition) { int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); if (insertVersion <= 0) { - logger.error("Update process definition error, projectCode:{}, processDefinitionCode:{}.", processDefinition.getProjectCode(), processDefinition.getCode()); + logger.error("Update process definition error, projectCode:{}, processDefinitionCode:{}.", + processDefinition.getProjectCode(), processDefinition.getCode()); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } else - logger.info("Update process definition complete, new version is {}, projectCode:{}, processDefinitionCode:{}.", + logger.info( + "Update process definition complete, new version is {}, projectCode:{}, processDefinitionCode:{}.", insertVersion, processDefinition.getProjectCode(), processDefinition.getCode()); processDefinition.setVersion(insertVersion); } @@ -213,7 +303,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } if (taskCode == 0) { - logger.error("Delete task process relation error due to parameter taskCode is 0, projectCode:{}, processDefinitionCode:{}.", + logger.error( + "Delete task process relation error due to parameter taskCode is 0, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode); putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR); return result; @@ -234,7 +325,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); List processTaskRelationList = Lists.newArrayList(processTaskRelations); if (CollectionUtils.isEmpty(processTaskRelationList)) { - logger.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode); + logger.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, + processDefinitionCode); putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); return result; } @@ -249,7 +341,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } if (CollectionUtils.isNotEmpty(downstreamList)) { String downstream = StringUtils.join(downstreamList, ","); - logger.warn("Relation can not be deleted because task has downstream tasks:[{}], projectCode:{}, processDefinitionCode:{}, taskDefinitionCode:{}.", + logger.warn( + "Relation can not be deleted because task has downstream tasks:[{}], projectCode:{}, processDefinitionCode:{}, taskDefinitionCode:{}.", downstream, projectCode, processDefinitionCode, taskCode); putMsg(result, Status.TASK_HAS_DOWNSTREAM, downstream); return result; @@ -265,12 +358,125 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); } else - logger.info("Delete {} type task definition complete, taskDefinitionCode:{}.", taskDefinition.getTaskType(), taskCode); + logger.info("Delete {} type task definition complete, taskDefinitionCode:{}.", + taskDefinition.getTaskType(), taskCode); } putMsg(result, Status.SUCCESS); return result; } + /** + * delete process task relation, will delete exists relation preTaskCode -> postTaskCode, throw error if not exists + * + * @param loginUser login user + * @param preTaskCode relation upstream code + * @param postTaskCode relation downstream code + */ + @Override + @Transactional + public void deleteTaskProcessRelationV2(User loginUser, + long preTaskCode, + long postTaskCode) { + ProcessTaskRelation processTaskRelation = + new TaskRelationFilterRequest(preTaskCode, postTaskCode).convert2TaskDefinition(); + + Page page = + new Page<>(new TaskRelationFilterRequest(preTaskCode, postTaskCode).getPageNo(), + new TaskRelationFilterRequest(preTaskCode, postTaskCode).getPageSize()); + IPage processTaskRelationIPage = + processTaskRelationMapper.filterProcessTaskRelation(page, processTaskRelation); + + List processTaskRelations = processTaskRelationIPage.getRecords(); + if (processTaskRelations.size() != 1) { + throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXPECT, 1, processTaskRelations.size()); + } + + ProcessTaskRelation processTaskRelationDb = processTaskRelations.get(0); + Project project = projectMapper.queryByCode(processTaskRelationDb.getProjectCode()); + projectService.checkProjectAndAuthThrowException(loginUser, project, null); + processTaskRelationMapper.deleteById(processTaskRelationDb.getId()); + } + + /** + * delete process task relation, will delete exists relation upstream -> downstream, throw error if not exists + * + * @param loginUser login user + * @param taskCode relation upstream code + * @param taskRelationUpdateUpstreamRequest relation downstream code + */ + @Override + @Transactional + public List updateUpstreamTaskDefinition(User loginUser, + long taskCode, + TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) { + TaskDefinition downstreamTask = taskDefinitionMapper.queryByCode(taskCode); + if (downstreamTask == null) { + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode); + } + List upstreamTaskCodes = taskRelationUpdateUpstreamRequest.getUpstreams(); + + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setPostTaskCode(taskCode); + + Page page = new Page<>(taskRelationUpdateUpstreamRequest.getPageNo(), + taskRelationUpdateUpstreamRequest.getPageSize()); + IPage processTaskRelationExistsIPage = + processTaskRelationMapper.filterProcessTaskRelation(page, processTaskRelation); + List processTaskRelationExists = processTaskRelationExistsIPage.getRecords(); + + ProcessDefinition processDefinition = null; + if (CollectionUtils.isNotEmpty(processTaskRelationExists)) { + processDefinition = + processDefinitionMapper.queryByCode(processTaskRelationExists.get(0).getProcessDefinitionCode()); + } else if (taskRelationUpdateUpstreamRequest.getWorkflowCode() != 0L) { + processDefinition = + processDefinitionMapper.queryByCode(taskRelationUpdateUpstreamRequest.getWorkflowCode()); + } + if (processDefinition == null) { + throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, + taskRelationUpdateUpstreamRequest.toString()); + } + + // get new relation to create and out of date relation to delete + List taskCodeCreates = upstreamTaskCodes + .stream() + .filter(upstreamTaskCode -> processTaskRelationExists.stream().noneMatch( + processTaskRelationExist -> processTaskRelationExist.getPreTaskCode() == upstreamTaskCode)) + .collect(Collectors.toList()); + List taskCodeDeletes = processTaskRelationExists + .stream() + .map(ProcessTaskRelation::getPreTaskCode) + .filter(preTaskCode -> !upstreamTaskCodes.contains(preTaskCode)) + .collect(Collectors.toList()); + + // delete relation not exists + if (CollectionUtils.isNotEmpty(taskCodeDeletes)) { + int delete = processTaskRelationMapper.deleteBatchIds(taskCodeDeletes); + if (delete != taskCodeDeletes.size()) { + throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_DELETE_ERROR, taskCodeDeletes); + } + } + + // create relation not exists + List processTaskRelations = new ArrayList<>(); + for (long createCode : taskCodeCreates) { + TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode); + ProcessTaskRelation processTaskRelationCreate = + new ProcessTaskRelation(null, processDefinition.getVersion(), downstreamTask.getProjectCode(), + processDefinition.getCode(), upstreamTask.getCode(), upstreamTask.getVersion(), + downstreamTask.getCode(), downstreamTask.getVersion(), null, null); + processTaskRelations.add(processTaskRelationCreate); + } + int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelations); + if (batchInsert != processTaskRelations.size()) { + throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_CREATE_ERROR, taskCodeCreates); + } + + // batch sync to process task relation log + this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations); + return processTaskRelations; + } + private void updateRelation(User loginUser, Map result, ProcessDefinition processDefinition, List processTaskRelationList) { List relationLogs = @@ -279,12 +485,14 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P processDefinition.getCode(), processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE); if (insertResult == Constants.EXIT_CODE_SUCCESS) { - logger.info("Update task relations complete, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.", + logger.info( + "Update task relations complete, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.", processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion()); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); } else { - logger.error("Update task relations error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.", + logger.error( + "Update task relations error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.", processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion()); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); @@ -348,7 +556,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode()); if (processDefinition == null) { - logger.error("Process definition does not exist, processDefinitionCode:{}.", upstreamList.get(0).getProcessDefinitionCode()); + logger.error("Process definition does not exist, processDefinitionCode:{}.", + upstreamList.get(0).getProcessDefinitionCode()); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(upstreamList.get(0).getProcessDefinitionCode())); return result; @@ -418,7 +627,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(downstreamList.get(0).getProcessDefinitionCode()); if (processDefinition == null) { - logger.error("Process definition does not exist, processDefinitionCode:{}.", downstreamList.get(0).getProcessDefinitionCode()); + logger.error("Process definition does not exist, processDefinitionCode:{}.", + downstreamList.get(0).getProcessDefinitionCode()); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(downstreamList.get(0).getProcessDefinitionCode())); return result; @@ -530,7 +740,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); if (processDefinition == null) { - logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode); + logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, + processDefinitionCode); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode)); return result; } @@ -538,7 +749,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); List processTaskRelationList = Lists.newArrayList(processTaskRelations); if (CollectionUtils.isEmpty(processTaskRelationList)) { - logger.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode); + logger.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, + processDefinitionCode); putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); return result; } @@ -561,12 +773,14 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P if (processTaskRelation.getPreTaskCode() == preTaskCode) { int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId()); if (delete == 0) { - logger.error("Delete task relation edge error, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}", + logger.error( + "Delete task relation edge error, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}", processTaskRelation.getId(), preTaskCode, postTaskCode); putMsg(result, Status.DELETE_EDGE_ERROR); throw new ServiceException(Status.DELETE_EDGE_ERROR); } else - logger.info("Delete task relation edge complete, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}", + logger.info( + "Delete task relation edge complete, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}", processTaskRelation.getId(), preTaskCode, postTaskCode); processTaskRelationList.remove(processTaskRelation); } @@ -577,7 +791,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P processTaskRelation.setPreTaskVersion(0); processTaskRelation.setPreTaskCode(0L); processTaskRelationList.add(processTaskRelation); - logger.info("Delete task relation through set invalid value for it: preTaskCode from {} to 0, processTaskRelationId:{}.", + logger.info( + "Delete task relation through set invalid value for it: preTaskCode from {} to 0, processTaskRelationId:{}.", preTaskCode, processTaskRelation.getId()); } updateProcessDefiniteVersion(loginUser, result, processDefinition); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 947dfc2639..f60d85e3e8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -22,11 +22,19 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_VERSION_VIEW; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; +import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; +import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest; +import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest; +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest; +import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.permission.PermissionCheck; +import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; +import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -60,6 +68,7 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -105,6 +114,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; + @Autowired + private ProcessTaskRelationService processTaskRelationService; + @Autowired private ProcessDefinitionMapper processDefinitionMapper; @@ -114,6 +126,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Autowired private TaskPluginManager taskPluginManager; + @Autowired + private ProcessDefinitionService processDefinitionService; + /** * create task definition * @@ -166,6 +181,91 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } + private TaskDefinitionLog persist2TaskDefinitionLog(User user, TaskDefinition taskDefinition) { + TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition); + taskDefinitionLog.setOperator(user.getId()); + taskDefinitionLog.setOperateTime(new Date()); + int result = taskDefinitionLogMapper.insert(taskDefinitionLog); + if (result <= 0) { + throw new ServiceException(Status.CREATE_TASK_DEFINITION_LOG_ERROR, taskDefinitionLog.getName()); + } + return taskDefinitionLog; + } + + private void checkTaskDefinitionValid(User user, TaskDefinition taskDefinition, String permissions) { + // check user access for project + Project project = projectMapper.queryByCode(taskDefinition.getProjectCode()); + projectService.checkProjectAndAuthThrowException(user, project, permissions); + + if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() + .taskType(taskDefinition.getTaskType()) + .taskParams(taskDefinition.getTaskParams()) + .dependence(taskDefinition.getDependence()) + .build())) { + throw new ServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName()); + } + } + + private List updateTaskUpstreams(User user, long workflowCode, long taskCode, + String upstreamCodes) { + TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest = new TaskRelationUpdateUpstreamRequest(); + taskRelationUpdateUpstreamRequest.setWorkflowCode(workflowCode); + taskRelationUpdateUpstreamRequest.setUpstreams(upstreamCodes); + return processTaskRelationService.updateUpstreamTaskDefinition(user, taskCode, + taskRelationUpdateUpstreamRequest); + } + + private ProcessDefinition updateWorkflowLocation(User user, ProcessDefinition processDefinition) { + WorkflowUpdateRequest workflowUpdateRequest = new WorkflowUpdateRequest(); + workflowUpdateRequest.setLocation(null); + return processDefinitionService.updateSingleProcessDefinition(user, processDefinition.getCode(), + workflowUpdateRequest); + } + + /** + * Create resource task definition + * + * @param loginUser login user + * @param taskCreateRequest task definition json + * @return new TaskDefinition have created + */ + @Override + @Transactional + public TaskDefinition createTaskDefinitionV2(User loginUser, + TaskCreateRequest taskCreateRequest) { + TaskDefinition taskDefinition = taskCreateRequest.convert2TaskDefinition(); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(taskCreateRequest.getWorkflowCode()); + if (processDefinition == null) { + throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, taskCreateRequest.getWorkflowCode()); + } + // Add project code from process definition if not exists + if (taskDefinition.getProjectCode() == 0L) { + taskDefinition.setProjectCode(processDefinition.getProjectCode()); + } + this.checkTaskDefinitionValid(loginUser, taskDefinition, TASK_DEFINITION_CREATE); + + long taskDefinitionCode; + try { + taskDefinitionCode = CodeGenerateUtils.getInstance().genCode(); + } catch (CodeGenerateException e) { + throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS); + } + taskDefinition.setCode(taskDefinitionCode); + + int create = taskDefinitionMapper.insert(taskDefinition); + if (create <= 0) { + throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); + } + this.persist2TaskDefinitionLog(loginUser, taskDefinition); + + // update related objects: task relationship, workflow's location(need to set to null and front-end will auto + // format it) + this.updateTaskUpstreams(loginUser, taskCreateRequest.getWorkflowCode(), taskDefinition.getCode(), + taskCreateRequest.getUpstreamTasksCodes()); + this.updateWorkflowLocation(loginUser, processDefinition); + return taskDefinition; + } + /** * create single task definition that binds the workflow * @@ -323,82 +423,71 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } /** - * delete task definition - * Only offline and no downstream dependency can be deleted - * - * @param loginUser login user - * @param projectCode project code - * @param taskCode task code + * Whether task definition can be deleted or not */ - @Transactional - @Override - public Map deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) { - Project project = projectMapper.queryByCode(projectCode); + private void taskCanDeleteValid(User user, TaskDefinition taskDefinition) { // check user access for project - Map result = - projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_DELETE); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } - if (taskCode == 0) { - logger.warn("Parameter taskCode 0 is invalid."); - putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); - return result; - } - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); - if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) { - logger.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); - return result; - } - if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { - logger.warn("Task definition can not be deleted due to task state online, taskDefinitionCode:{}.", - taskCode); - putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode); - return result; + Project project = projectMapper.queryByCode(taskDefinition.getProjectCode()); + projectService.checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_DELETE); + + // Whether task relation workflow is online + if (processService.isTaskOnline(taskDefinition.getCode()) && taskDefinition.getFlag() == Flag.YES) { + throw new ServiceException(Status.TASK_DEFINE_STATE_ONLINE, taskDefinition.getCode()); } + + // Whether task have downstream tasks List processTaskRelationList = - processTaskRelationMapper.queryDownstreamByTaskCode(taskCode); + processTaskRelationMapper.queryDownstreamByTaskCode(taskDefinition.getCode()); if (!processTaskRelationList.isEmpty()) { Set postTaskCodes = processTaskRelationList .stream() .map(ProcessTaskRelation::getPostTaskCode) .collect(Collectors.toSet()); - String postTaskCodesStr = StringUtils.join(postTaskCodes, ","); - logger.warn( - "Task definition can not be deleted due to downstream tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}", - taskCode, postTaskCodesStr); - putMsg(result, Status.TASK_HAS_DOWNSTREAM, postTaskCodesStr); - return result; + String postTaskCodesStr = StringUtils.join(postTaskCodes, Constants.COMMA); + throw new ServiceException(Status.TASK_HAS_DOWNSTREAM, postTaskCodesStr); + } + } + + /** + * Delete resource task definition by code + * + * Only task release state offline and no downstream tasks can be deleted, will also remove the exists + * task relation [upstreamTaskCode, taskCode] + * + * @param loginUser login user + * @param taskCode task code + */ + @Transactional + @Override + public void deleteTaskDefinitionByCode(User loginUser, long taskCode) { + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); + if (taskDefinition == null) { + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode); } + + this.taskCanDeleteValid(loginUser, taskDefinition); int delete = taskDefinitionMapper.deleteByCode(taskCode); - if (delete > 0) { - List taskRelationList = - processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); - if (!taskRelationList.isEmpty()) { - logger.info( - "Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.", - taskCode); - long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - List relationList = processTaskRelations.stream() - .filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList()); - updateDag(loginUser, result, processDefinitionCode, relationList, Lists.newArrayList()); - } else { - logger.info("Task definition delete complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, - taskCode); - putMsg(result, Status.SUCCESS); - } - } else { - logger.error("Task definition delete error, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode); - putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); - throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); + if (delete <= 0) { + throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR, taskDefinition.getCode()); + } + + // Delete task upstream tasks if exists + List taskRelationList = + processTaskRelationMapper.queryUpstreamByCode(taskDefinition.getProjectCode(), taskCode); + if (!taskRelationList.isEmpty()) { + logger.debug( + "Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.", + taskCode); + long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); + List processTaskRelations = processTaskRelationMapper + .queryByProcessCode(taskDefinition.getProjectCode(), processDefinitionCode); + List relationList = processTaskRelations.stream() + .filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList()); + updateDag(loginUser, processDefinitionCode, relationList, Lists.newArrayList()); } - return result; } - private void updateDag(User loginUser, Map result, long processDefinitionCode, + private void updateDag(User loginUser, long processDefinitionCode, List processTaskRelationList, List taskDefinitionLogs) { ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); @@ -424,12 +513,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe logger.info( "Save new version task relations complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}.", processDefinition.getProjectCode(), processDefinitionCode, insertVersion); - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefinition); } else { logger.error("Update task relations error, projectCode:{}, processDefinitionCode:{}.", processDefinition.getProjectCode(), processDefinitionCode); - putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } } @@ -461,7 +547,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - updateDag(loginUser, result, processDefinitionCode, processTaskRelations, + updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate)); } logger.info("Update task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode); @@ -470,6 +556,129 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } + private void TaskDefinitionUpdateValid(TaskDefinition taskDefinitionOriginal, TaskDefinition taskDefinitionUpdate) { + // Task already online + if (processService.isTaskOnline(taskDefinitionOriginal.getCode()) + && taskDefinitionOriginal.getFlag() == Flag.YES) { + // if stream, can update task definition without online check + if (taskDefinitionOriginal.getTaskExecuteType() != TaskExecuteType.STREAM) { + throw new ServiceException(Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION); + } + } + + // not update anything + if (taskDefinitionOriginal.equals(taskDefinitionUpdate)) { + throw new ServiceException(Status.TASK_DEFINITION_NOT_CHANGE, taskDefinitionOriginal.getCode()); + } + + // check version invalid + Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionOriginal.getCode()); + if (version == null || version == 0) { + throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionOriginal.getCode()); + } + } + + /** + * update task definition + * + * @param loginUser login user + * @param taskCode task code + * @param taskUpdateRequest task definition json object + * @return new TaskDefinition have updated + */ + @Transactional + @Override + public TaskDefinition updateTaskDefinitionV2(User loginUser, + long taskCode, + TaskUpdateRequest taskUpdateRequest) { + TaskDefinition taskDefinitionOriginal = taskDefinitionMapper.queryByCode(taskCode); + if (taskDefinitionOriginal == null) { + throw new ServiceException(Status.TASK_DEFINITION_NOT_EXISTS, taskCode); + } + + TaskDefinition taskDefinitionUpdate; + try { + taskDefinitionUpdate = taskUpdateRequest.mergeIntoTaskDefinition(taskDefinitionOriginal); + } catch (InvocationTargetException | IllegalAccessException | InstantiationException + | NoSuchMethodException e) { + throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, taskUpdateRequest.toString()); + } + this.checkTaskDefinitionValid(loginUser, taskDefinitionUpdate, TASK_DEFINITION_UPDATE); + this.TaskDefinitionUpdateValid(taskDefinitionOriginal, taskDefinitionUpdate); + + int update = taskDefinitionMapper.updateById(taskDefinitionUpdate); + if (update <= 0) { + throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); + } + TaskDefinitionLog taskDefinitionLog = this.persist2TaskDefinitionLog(loginUser, taskDefinitionUpdate); + + List taskRelationList = + processTaskRelationMapper.queryUpstreamByCode(taskDefinitionUpdate.getProjectCode(), taskCode); + if (!taskRelationList.isEmpty()) { + logger.info( + "Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", + taskCode); + long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); + List processTaskRelations = processTaskRelationMapper + .queryByProcessCode(taskDefinitionUpdate.getProjectCode(), processDefinitionCode); + updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog)); + } + + this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(), + taskUpdateRequest.getUpstreamTasksCodes()); + + return taskDefinitionUpdate; + } + + /** + * Get resource task definition by code + * + * @param loginUser login user + * @param taskCode task code + * @return TaskDefinition + */ + @Override + public TaskDefinition getTaskDefinition(User loginUser, + long taskCode) { + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); + if (taskDefinition == null) { + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode); + } + Project project = projectMapper.queryByCode(taskDefinition.getProjectCode()); + projectService.checkProjectAndAuthThrowException(loginUser, project, TASK_DEFINITION); + return taskDefinition; + } + + /** + * Get resource task definition according to query parameter + * + * @param loginUser login user + * @param taskFilterRequest taskFilterRequest object you want to filter the resource task definitions + * @return TaskDefinitions of page + */ + @Override + public PageInfo filterTaskDefinition(User loginUser, + TaskFilterRequest taskFilterRequest) { + TaskDefinition taskDefinition = taskFilterRequest.convert2TaskDefinition(); + if (taskDefinition.getProjectName() != null) { + Project project = projectMapper.queryByName(taskDefinition.getProjectName()); + // check user access for project + projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION); + taskDefinition.setProjectCode(project.getCode()); + } + + Page page = + new Page<>(taskFilterRequest.getPageNo(), taskFilterRequest.getPageSize()); + IPage taskDefinitionIPage = + taskDefinitionMapper.filterTaskDefinition(page, taskDefinition); + + PageInfo pageInfo = + new PageInfo<>(taskFilterRequest.getPageNo(), taskFilterRequest.getPageSize()); + pageInfo.setTotal((int) taskDefinitionIPage.getTotal()); + pageInfo.setTotalList(taskDefinitionIPage.getRecords()); + return pageInfo; + } + private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map result) { Project project = projectMapper.queryByCode(projectCode); @@ -628,7 +837,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (queryUpStreamTaskCodeMap.isEmpty() && !processTaskRelationList.isEmpty()) { processTaskRelationList.add(processTaskRelationList.get(0)); } - updateDag(loginUser, result, taskRelation.getProcessDefinitionCode(), processTaskRelations, + updateDag(loginUser, taskRelation.getProcessDefinitionCode(), processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate)); } logger.info( @@ -686,7 +895,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - updateDag(loginUser, result, processDefinitionCode, processTaskRelations, + updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionUpdate)); } else { logger.info( diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java index d92bc876c5..0829486a86 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java @@ -17,7 +17,11 @@ package org.apache.dolphinscheduler.api.service; +import static org.mockito.ArgumentMatchers.isA; + +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationCreateRequest; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.impl.ProcessTaskRelationServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; @@ -41,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,13 +54,17 @@ import java.util.Set; import java.util.stream.Collectors; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.google.common.collect.Lists; /** @@ -91,17 +100,21 @@ public class ProcessTaskRelationServiceTest { @Mock private ProcessService processService; - /** - * get Mock Admin User - * - * @return admin user - */ - private User getAdminUser() { + private static final long PROJECT_CODE = 1L; + private static final long PROCESS_DEFINITION_CODE = 2L; + private static final long UPSTREAM_TASK_CODE = 3L; + private static final long DOWNSTREAM_TASK_CODE = 4L; + protected User user; + protected Exception exception; + + @Before + public void before() { User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserName("admin"); + loginUser.setId(1); + loginUser.setTenantId(2); loginUser.setUserType(UserType.GENERAL_USER); - return loginUser; + loginUser.setUserName("admin"); + user = loginUser; } /** @@ -131,6 +144,7 @@ public class ProcessTaskRelationServiceTest { private TaskDefinitionLog buildTaskDefinitionLog(long projectCode, long code, int version) { TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog() { + @Override public boolean equals(Object o) { if (this == o) { @@ -159,6 +173,7 @@ public class ProcessTaskRelationServiceTest { private TaskDefinition buildTaskDefinition(long projectCode, long code, int version) { TaskDefinition taskDefinition = new TaskDefinition() { + @Override public boolean equals(Object o) { if (this == o) { @@ -210,7 +225,7 @@ public class ProcessTaskRelationServiceTest { return processTaskRelationList; } - private List getProcessTaskDownstreamRelationList(long projectCode,long taskCode) { + private List getProcessTaskDownstreamRelationList(long projectCode, long taskCode) { ProcessTaskRelation processTaskRelationDownstream0 = new ProcessTaskRelation(); processTaskRelationDownstream0.setPreTaskCode(taskCode); processTaskRelationDownstream0.setPostTaskCode(456); @@ -264,27 +279,29 @@ public class ProcessTaskRelationServiceTest { Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); - Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result); + Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result); Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition()); - Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(Lists.newArrayList()); + Mockito.when( + processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)) + .thenReturn(Lists.newArrayList()); Mockito.when(taskDefinitionMapper.queryByCode(postTaskCode)).thenReturn(getTaskDefinition()); - List processTaskRelationList = Lists.newArrayList(); - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setProjectCode(projectCode); - processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); - processTaskRelationLog.setPreTaskCode(0L); - processTaskRelationLog.setPreTaskVersion(0); - processTaskRelationLog.setPostTaskCode(postTaskCode); - processTaskRelationLog.setPostTaskVersion(1); - processTaskRelationList.add(processTaskRelationLog); + + List processTaskRelationList = Lists.newArrayList(); + List processTaskRelationLogList = Lists.newArrayList(); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProjectCode(projectCode); + processTaskRelation.setProcessDefinitionCode(processDefinitionCode); + processTaskRelation.setPreTaskCode(0L); + processTaskRelation.setPreTaskVersion(0); + processTaskRelation.setPostTaskCode(postTaskCode); + processTaskRelation.setPostTaskVersion(1); + processTaskRelationList.add(processTaskRelation); + processTaskRelationLogList.add(new ProcessTaskRelationLog(processTaskRelation)); + Mockito.when(processTaskRelationMapper.batchInsert(processTaskRelationList)).thenReturn(1); - Mockito.when(processTaskRelationLogMapper.batchInsert(processTaskRelationList)).thenReturn(1); + Mockito.when(processTaskRelationLogMapper.batchInsert(processTaskRelationLogList)).thenReturn(1); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @@ -296,17 +313,13 @@ public class ProcessTaskRelationServiceTest { Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); - Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result); + Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result); - List processTaskRelationList = getProcessTaskDownstreamRelationList(projectCode,taskCode); + List processTaskRelationList = getProcessTaskDownstreamRelationList(projectCode, taskCode); - Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode,taskCode)) + Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode)) .thenReturn(processTaskRelationList); if (CollectionUtils.isNotEmpty(processTaskRelationList)) { @@ -327,8 +340,7 @@ public class ProcessTaskRelationServiceTest { TaskDefinitionLog taskDefinitionLog = buildTaskDefinitionLog( processTaskRelation.getProjectCode(), processTaskRelation.getPostTaskCode(), - processTaskRelation.getPostTaskVersion() - ); + processTaskRelation.getPostTaskVersion()); return taskDefinitionLog; }) .collect(Collectors.toSet()); @@ -337,7 +349,7 @@ public class ProcessTaskRelationServiceTest { .thenReturn(taskDefinitionLogList); } Map relation = processTaskRelationService - .queryDownstreamRelation(loginUser, projectCode, taskCode); + .queryDownstreamRelation(user, projectCode, taskCode); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); Assert.assertEquals(2, ((List) relation.get("data")).size()); } @@ -350,16 +362,13 @@ public class ProcessTaskRelationServiceTest { Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); - Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result); - List processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode,taskCode); + Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result); + List processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode, taskCode); - Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList); + Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)) + .thenReturn(processTaskRelationList); if (CollectionUtils.isNotEmpty(processTaskRelationList)) { Set taskDefinitions = processTaskRelationList @@ -388,7 +397,7 @@ public class ProcessTaskRelationServiceTest { .thenReturn(taskDefinitionLogList); } Map relation = processTaskRelationService - .queryUpstreamRelation(loginUser, projectCode, taskCode); + .queryUpstreamRelation(user, projectCode, taskCode); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); Assert.assertEquals(2, ((List) relation.get("data")).size()); } @@ -400,12 +409,9 @@ public class ProcessTaskRelationServiceTest { Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result); + Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result); List processTaskRelationList = new ArrayList<>(); ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); processTaskRelation.setProjectCode(projectCode); @@ -413,14 +419,17 @@ public class ProcessTaskRelationServiceTest { processTaskRelation.setPreTaskCode(taskCode); processTaskRelation.setPostTaskCode(123L); processTaskRelationList.add(processTaskRelation); - Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList); + Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode)) + .thenReturn(processTaskRelationList); ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); ProcessDefinition processDefinition = getProcessDefinition(); Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(processDefinition); - Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1); - Map result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode); + Mockito.when(processService.saveProcessDefine(user, processDefinition, Boolean.TRUE, Boolean.TRUE)) + .thenReturn(1); + Map result1 = + processTaskRelationService.deleteDownstreamRelation(user, projectCode, "123", taskCode); Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS)); } @@ -431,9 +440,6 @@ public class ProcessTaskRelationServiceTest { Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); List processTaskRelationList = Lists.newArrayList(); @@ -445,13 +451,15 @@ public class ProcessTaskRelationServiceTest { processTaskRelation.setPostTaskCode(taskCode); processTaskRelation.setPostTaskVersion(1); processTaskRelationList.add(processTaskRelation); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result); - Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList); + Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result); + Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)) + .thenReturn(processTaskRelationList); Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(getProcessDefinition()); Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList); - List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); - Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L, - 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0); + List relationLogs = + processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + Mockito.when(processService.saveTaskRelation(user, 1L, 1L, + 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @@ -465,13 +473,12 @@ public class ProcessTaskRelationServiceTest { Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result); - Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(Lists.newArrayList()); + Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result); + Mockito.when( + processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)) + .thenReturn(Lists.newArrayList()); Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition()); Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition()); TaskDefinition taskDefinition = new TaskDefinition(); @@ -486,10 +493,12 @@ public class ProcessTaskRelationServiceTest { processTaskRelation.setPostTaskCode(taskCode); processTaskRelation.setPostTaskVersion(1); processTaskRelationList.add(processTaskRelation); - Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)).thenReturn(processTaskRelationList); - List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); - Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L, - 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0); + Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)) + .thenReturn(processTaskRelationList); + List relationLogs = + processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + Mockito.when(processService.saveTaskRelation(user, 1L, 1L, + 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @@ -502,12 +511,9 @@ public class ProcessTaskRelationServiceTest { Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result); + Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result); ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); processTaskRelation.setProjectCode(projectCode); processTaskRelation.setProcessDefinitionCode(processDefinitionCode); @@ -515,14 +521,111 @@ public class ProcessTaskRelationServiceTest { processTaskRelation.setPreTaskCode(preTaskCode); processTaskRelation.setPostTaskCode(postTaskCode); ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); - processTaskRelationLog.setOperator(loginUser.getId()); + processTaskRelationLog.setOperator(user.getId()); List processTaskRelationList = new ArrayList<>(); processTaskRelationList.add(processTaskRelation); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result); + Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result); Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList); - List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); - Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L, - 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0); + List relationLogs = + processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + Mockito.when(processService.saveTaskRelation(user, 1L, 1L, + 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } + + @Test + public void testCreateProcessTaskRelationV2() { + TaskRelationCreateRequest taskRelationCreateRequest = new TaskRelationCreateRequest(); + taskRelationCreateRequest.setWorkflowCode(PROCESS_DEFINITION_CODE); + + // error process definition not exists + exception = Assertions.assertThrows(ServiceException.class, + () -> processTaskRelationService.createProcessTaskRelationV2(user, taskRelationCreateRequest)); + Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); + + // error project without permissions + Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition()); + Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject(PROJECT_CODE)); + Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService) + .checkProjectAndAuthThrowException(user, getProject(PROJECT_CODE), null); + exception = Assertions.assertThrows(ServiceException.class, + () -> processTaskRelationService.createProcessTaskRelationV2(user, taskRelationCreateRequest)); + Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(), + ((ServiceException) exception).getCode()); + + // error insert process task relation + Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(PROJECT_CODE), + null); + Mockito.when(processTaskRelationMapper.insert(isA(ProcessTaskRelation.class))).thenReturn(0); + Mockito.when(taskDefinitionMapper.queryByCode(isA(Long.class))).thenReturn(getTaskDefinition()); + exception = Assertions.assertThrows(ServiceException.class, + () -> processTaskRelationService.createProcessTaskRelationV2(user, taskRelationCreateRequest)); + Assertions.assertEquals(Status.CREATE_PROCESS_TASK_RELATION_ERROR.getCode(), + ((ServiceException) exception).getCode()); + + // error insert process task relation log + Mockito.when(processTaskRelationMapper.insert(isA(ProcessTaskRelation.class))).thenReturn(1); + Mockito.when(processTaskRelationLogMapper.insert(isA(ProcessTaskRelationLog.class))).thenReturn(0); + exception = Assertions.assertThrows(ServiceException.class, + () -> processTaskRelationService.createProcessTaskRelationV2(user, taskRelationCreateRequest)); + Assertions.assertEquals(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR.getCode(), + ((ServiceException) exception).getCode()); + + // success + Mockito.when(processTaskRelationLogMapper.insert(isA(ProcessTaskRelationLog.class))).thenReturn(1); + Assertions.assertDoesNotThrow( + () -> processTaskRelationService.createProcessTaskRelationV2(user, taskRelationCreateRequest)); + } + + @Test + public void testDeleteTaskProcessRelationV2() { + TaskRelationCreateRequest taskRelationCreateRequest = new TaskRelationCreateRequest(); + taskRelationCreateRequest.setWorkflowCode(PROCESS_DEFINITION_CODE); + + // error task relation size + Mockito.when( + processTaskRelationMapper.filterProcessTaskRelation(isA(Page.class), isA(ProcessTaskRelation.class))) + .thenReturn(getMultiProcessTaskRelations()); + exception = Assertions.assertThrows(ServiceException.class, () -> processTaskRelationService + .deleteTaskProcessRelationV2(user, UPSTREAM_TASK_CODE, DOWNSTREAM_TASK_CODE)); + Assertions.assertEquals(Status.PROCESS_TASK_RELATION_NOT_EXPECT.getCode(), + ((ServiceException) exception).getCode()); + + // success + Mockito.when( + processTaskRelationMapper.filterProcessTaskRelation(isA(Page.class), isA(ProcessTaskRelation.class))) + .thenReturn(getOneProcessTaskRelation()); + Assertions.assertDoesNotThrow(() -> processTaskRelationService.deleteTaskProcessRelationV2(user, + UPSTREAM_TASK_CODE, DOWNSTREAM_TASK_CODE)); + } + + private IPage getOneProcessTaskRelation() { + IPage processTaskRelationIPage = new Page<>(); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE); + processTaskRelation.setPreTaskCode(UPSTREAM_TASK_CODE); + processTaskRelation.setPostTaskCode(DOWNSTREAM_TASK_CODE); + processTaskRelationIPage.setRecords(Collections.singletonList(processTaskRelation)); + return processTaskRelationIPage; + } + + private IPage getMultiProcessTaskRelations() { + IPage processTaskRelationIPage = new Page<>(); + List processTaskRelations = new ArrayList<>(); + + ProcessTaskRelation processTaskRelation0 = new ProcessTaskRelation(); + processTaskRelation0.setProcessDefinitionCode(PROCESS_DEFINITION_CODE); + processTaskRelation0.setPreTaskCode(UPSTREAM_TASK_CODE); + processTaskRelation0.setPostTaskCode(DOWNSTREAM_TASK_CODE); + processTaskRelations.add(processTaskRelation0); + + ProcessTaskRelation processTaskRelation1 = new ProcessTaskRelation(); + processTaskRelation1.setProcessDefinitionCode(PROCESS_DEFINITION_CODE); + processTaskRelation1.setPreTaskCode(UPSTREAM_TASK_CODE); + processTaskRelation1.setPostTaskCode(DOWNSTREAM_TASK_CODE); + processTaskRelations.add(processTaskRelation1); + + processTaskRelationIPage.setRecords(processTaskRelations); + return processTaskRelationIPage; + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index 6a969b182b..c63684ebc0 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -17,17 +17,34 @@ package org.apache.dolphinscheduler.api.service; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; +import static org.mockito.ArgumentMatchers.isA; + +import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; +import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest; +import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest; +import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.service.impl.TaskDefinitionServiceImpl; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; @@ -42,15 +59,15 @@ import java.util.List; import java.util.Map; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*; - @RunWith(MockitoJUnitRunner.class) public class TaskDefinitionServiceImplTest { @@ -78,148 +95,156 @@ public class TaskDefinitionServiceImplTest { @Mock private TaskPluginManager taskPluginManager; - @Test - public void createTaskDefinition() { - long projectCode = 1L; + @Mock + private ProcessTaskRelationService processTaskRelationService; - Project project = getProject(projectCode); - Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); + @Mock + private ProcessDefinitionMapper processDefinitionMapper; + @Mock + private ProcessDefinitionService processDefinitionService; + + private static final String TASK_PARAMETER = + "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";; + private static final long PROJECT_CODE = 1L; + private static final long PROCESS_DEFINITION_CODE = 2L; + private static final long TASK_CODE = 3L; + private static final int VERSION = 1; + private static final int RESOURCE_RATE = -1; + protected User user; + protected Exception exception; + + @Before + public void before() { User loginUser = new User(); - loginUser.setId(-1); + loginUser.setId(1); + loginUser.setTenantId(2); loginUser.setUserType(UserType.GENERAL_USER); + loginUser.setUserName("admin"); + user = loginUser; + } + + @Test + public void createTaskDefinition() { + Project project = getProject(); + Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_CREATE)).thenReturn(result); - - String createTaskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" - + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," - + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" - + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," - + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," - + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," - + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; + putMsg(result, Status.SUCCESS, PROJECT_CODE); + Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION_CREATE)) + .thenReturn(result); + + String createTaskDefinitionJson = + "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" + + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," + + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" + + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," + + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," + + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; List taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class); - Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions, Boolean.TRUE)).thenReturn(1); + Mockito.when(processService.saveTaskDefine(user, PROJECT_CODE, taskDefinitions, Boolean.TRUE)) + .thenReturn(1); Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); Map relation = taskDefinitionService - .createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson); + .createTaskDefinition(user, PROJECT_CODE, createTaskDefinitionJson); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); - } @Test public void updateTaskDefinition() { - String taskDefinitionJson = "{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" - + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," - + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" - + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," - + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," - + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," - + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}"; - long projectCode = 1L; - long taskCode = 1L; - - Project project = getProject(projectCode); - Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); + String taskDefinitionJson = + "{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" + + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," + + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" + + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," + + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," + + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}"; + Project project = getProject(); + Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_UPDATE)).thenReturn(result); + putMsg(result, Status.SUCCESS, PROJECT_CODE); + Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION_UPDATE)) + .thenReturn(result); - Mockito.when(processService.isTaskOnline(taskCode)).thenReturn(Boolean.FALSE); - Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(new TaskDefinition()); + Mockito.when(processService.isTaskOnline(TASK_CODE)).thenReturn(Boolean.FALSE); + Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition()); Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); - Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode)).thenReturn(1); + Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1); Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); - result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskCode, taskDefinitionJson); + result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @Test public void queryTaskDefinitionByName() { String taskName = "task"; - long projectCode = 1L; - long processCode = 1L; - Project project = getProject(projectCode); - Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); + Project project = getProject(); + Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION )).thenReturn(result); + putMsg(result, Status.SUCCESS, PROJECT_CODE); + Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION)) + .thenReturn(result); - Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName)) - .thenReturn(new TaskDefinition()); + Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), PROCESS_DEFINITION_CODE, taskName)) + .thenReturn(new TaskDefinition()); Map relation = taskDefinitionService - .queryTaskDefinitionByName(loginUser, projectCode, processCode, taskName); + .queryTaskDefinitionByName(user, PROJECT_CODE, PROCESS_DEFINITION_CODE, taskName); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); } @Test public void deleteTaskDefinitionByCode() { - long projectCode = 1L; - long taskCode = 1L; - - Project project = getProject(projectCode); - Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); - - Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_DELETE )).thenReturn(result); - Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition()); - Mockito.when(processTaskRelationMapper.queryDownstreamByTaskCode(taskCode)) - .thenReturn(new ArrayList<>()); - Mockito.when(taskDefinitionMapper.deleteByCode(taskCode)) - .thenReturn(1); - - Map relation = taskDefinitionService - .deleteTaskDefinitionByCode(loginUser, projectCode, taskCode); - - Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); + Project project = getProject(); + Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); + + // error task definition not find + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE)); + Assertions.assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); + + // error delete single task definition object + Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition()); + Mockito.when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(0); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE)); + Assertions.assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR.getCode(), + ((ServiceException) exception).getCode()); + + // success + Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, + TASK_DEFINITION_DELETE); + Mockito.when(processTaskRelationMapper.queryDownstreamByTaskCode(TASK_CODE)).thenReturn(new ArrayList<>()); + Mockito.when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(1); + Assertions.assertDoesNotThrow(() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE)); } @Test public void switchVersion() { - int version = 1; - long taskCode = 11L; - long projectCode = 1L; - - Project project = getProject(projectCode); - Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); + Project project = getProject(); + Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_SWITCH_TO_THIS_VERSION)).thenReturn(result); + putMsg(result, Status.SUCCESS, PROJECT_CODE); + Mockito.when( + projectService.checkProjectAndAuth(user, project, PROJECT_CODE, WORKFLOW_SWITCH_TO_THIS_VERSION)) + .thenReturn(result); - Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version)) - .thenReturn(new TaskDefinitionLog()); + Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, VERSION)) + .thenReturn(new TaskDefinitionLog()); TaskDefinition taskDefinition = new TaskDefinition(); - taskDefinition.setProjectCode(projectCode); - Mockito.when(taskDefinitionMapper.queryByCode(taskCode)) - .thenReturn(taskDefinition); + taskDefinition.setProjectCode(PROJECT_CODE); + Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)) + .thenReturn(taskDefinition); Mockito.when(taskDefinitionMapper.updateById(new TaskDefinitionLog())).thenReturn(1); Map relation = taskDefinitionService - .switchVersion(loginUser, projectCode, taskCode, version); + .switchVersion(user, PROJECT_CODE, TASK_CODE, VERSION); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); } @@ -233,56 +258,36 @@ public class TaskDefinitionServiceImplTest { } } - /** - * get mock Project - * - * @param projectCode projectCode - * @return Project - */ - private Project getProject(long projectCode) { - Project project = new Project(); - project.setId(1); - project.setCode(projectCode); - project.setName("test"); - project.setUserId(1); - return project; - } - - private TaskDefinition getTaskDefinition() { - TaskDefinition taskDefinition = new TaskDefinition(); - taskDefinition.setProjectCode(1L); - taskDefinition.setCode(1L); - taskDefinition.setVersion(1); - taskDefinition.setTaskType("SHELL"); - return taskDefinition; - } - @Test public void checkJson() { - String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" - + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," - + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" - + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," - + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," - + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," - + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; + String taskDefinitionJson = + "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" + + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," + + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" + + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," + + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," + + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); Assert.assertFalse(taskDefinitionLogs.isEmpty()); - String taskJson = "[{\"name\":\"shell1\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[]," - + "\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}," - + "\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\"," - + "\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"},{\"name\":\"shell2\",\"description\":\"\"," - + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 2\",\"conditionResult\":{\"successNode\"" - + ":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\"," - + "\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"}]"; + String taskJson = + "[{\"name\":\"shell1\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[]," + + "\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}," + + "\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\"," + + "\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"},{\"name\":\"shell2\",\"description\":\"\"," + + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 2\",\"conditionResult\":{\"successNode\"" + + ":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\"," + + "\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"}]"; taskDefinitionLogs = JSONUtils.toList(taskJson, TaskDefinitionLog.class); Assert.assertFalse(taskDefinitionLogs.isEmpty()); - String taskParams = "{\"resourceList\":[],\"localParams\":[{\"prop\":\"datetime\",\"direct\":\"IN\",\"type\":\"VARCHAR\"," - + "\"value\":\"${system.datetime}\"}],\"rawScript\":\"echo ${datetime}\",\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"]," - + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":{}}"; + String taskParams = + "{\"resourceList\":[],\"localParams\":[{\"prop\":\"datetime\",\"direct\":\"IN\",\"type\":\"VARCHAR\"," + + "\"value\":\"${system.datetime}\"}],\"rawScript\":\"echo ${datetime}\",\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"]," + + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":{}}"; Map parameters = JSONUtils.parseObject(taskParams, Map.class); Assert.assertNotNull(parameters); - String params = "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}"; + String params = + "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}"; Map parameters1 = JSONUtils.parseObject(params, Map.class); Assert.assertNotNull(parameters1); } @@ -295,43 +300,238 @@ public class TaskDefinitionServiceImplTest { @Test public void testReleaseTaskDefinition() { - long projectCode = 1L; - long taskCode = 1L; - Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); - - Project project = getProject(projectCode); - User loginUser = new User(); - loginUser.setId(1); - loginUser.setUserType(UserType.GENERAL_USER); + Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); + Project project = getProject(); // check task dose not exist Map result = new HashMap<>(); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result); - Map map = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.OFFLINE); + putMsg(result, Status.TASK_DEFINE_NOT_EXIST, TASK_CODE); + Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, null)).thenReturn(result); + Map map = + taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE); Assert.assertEquals(Status.TASK_DEFINE_NOT_EXIST, map.get(Constants.STATUS)); // process definition offline putMsg(result, Status.SUCCESS); TaskDefinition taskDefinition = new TaskDefinition(); - taskDefinition.setProjectCode(projectCode); + taskDefinition.setProjectCode(PROJECT_CODE); taskDefinition.setVersion(1); - taskDefinition.setCode(taskCode); - String params = "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}"; + taskDefinition.setCode(TASK_CODE); + String params = + "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}"; taskDefinition.setTaskParams(params); taskDefinition.setTaskType("SHELL"); - Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition); + Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition); TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition); - Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinition.getVersion())).thenReturn(taskDefinitionLog); - Map offlineTaskResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.OFFLINE); + Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, taskDefinition.getVersion())) + .thenReturn(taskDefinitionLog); + Map offlineTaskResult = + taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE); Assert.assertEquals(Status.SUCCESS, offlineTaskResult.get(Constants.STATUS)); // process definition online, resource exist - Map onlineTaskResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.ONLINE); + Map onlineTaskResult = + taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.ONLINE); Assert.assertEquals(Status.SUCCESS, onlineTaskResult.get(Constants.STATUS)); // release error code - Map failResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.getEnum(2)); + Map failResult = + taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.getEnum(2)); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS)); } + + @Test + public void testCreateTaskDefinitionV2() { + TaskCreateRequest taskCreateRequest = new TaskCreateRequest(); + taskCreateRequest.setProjectCode(PROJECT_CODE); + taskCreateRequest.setWorkflowCode(PROCESS_DEFINITION_CODE); + + // error process definition not find + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); + + // error project not find + Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition()); + Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); + Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) + .checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_CREATE); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); + + // error task definition + taskCreateRequest.setTaskParams(TASK_PARAMETER); + Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), + TASK_DEFINITION_CREATE); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(), + ((ServiceException) exception).getCode()); + + // error create task definition object + Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); + Mockito.when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(0); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_ERROR.getCode(), + ((ServiceException) exception).getCode()); + + // error sync to task definition log + Mockito.when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(1); + Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_LOG_ERROR.getCode(), + ((ServiceException) exception).getCode()); + + // success + Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); + // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService + Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class), + isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList()); + Mockito.when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class), + isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition()); + Assertions.assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + } + + @Test + public void testUpdateTaskDefinitionV2() { + TaskUpdateRequest taskUpdateRequest = new TaskUpdateRequest(); + + // error task definition not exists + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + Assertions.assertEquals(Status.TASK_DEFINITION_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode()); + + // error project not find + Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition()); + Mockito.when(projectMapper.queryByCode(isA(Long.class))).thenReturn(getProject()); + Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) + .checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_UPDATE); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); + + // error task definition + Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), + TASK_DEFINITION_UPDATE); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(), + ((ServiceException) exception).getCode()); + + // error task definition already online + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(), + ((ServiceException) exception).getCode()); + + // error task definition nothing update + Mockito.when(processService.isTaskOnline(TASK_CODE)).thenReturn(false); + Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + Assertions.assertEquals(Status.TASK_DEFINITION_NOT_CHANGE.getCode(), ((ServiceException) exception).getCode()); + + // error task definition version invalid + taskUpdateRequest.setTaskPriority(String.valueOf(Priority.HIGH)); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + Assertions.assertEquals(Status.DATA_IS_NOT_VALID.getCode(), ((ServiceException) exception).getCode()); + + // error task definition update effect number + Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(VERSION); + Mockito.when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(0); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + Assertions.assertEquals(Status.UPDATE_TASK_DEFINITION_ERROR.getCode(), + ((ServiceException) exception).getCode()); + + // error task definition log insert + Mockito.when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(1); + Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_LOG_ERROR.getCode(), + ((ServiceException) exception).getCode()); + + // success + Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); + // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService + Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class), + isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList()); + Assertions.assertDoesNotThrow( + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + } + + @Test + public void testGetTaskDefinition() { + // error task definition not exists + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.getTaskDefinition(user, TASK_CODE)); + Assertions.assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); + + // error task definition not exists + Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition()); + Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); + Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService) + .checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.getTaskDefinition(user, TASK_CODE)); + Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(), + ((ServiceException) exception).getCode()); + + // success + Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION); + Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE)); + } + + /** + * get mock Project + * + * @return Project + */ + private Project getProject() { + Project project = new Project(); + project.setId(1); + project.setCode(PROJECT_CODE); + project.setName("test"); + project.setUserId(1); + return project; + } + + private ProcessDefinition getProcessDefinition() { + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setProjectCode(PROJECT_CODE); + processDefinition.setCode(PROCESS_DEFINITION_CODE); + processDefinition.setVersion(VERSION); + return processDefinition; + } + + private TaskDefinition getTaskDefinition() { + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setProjectCode(PROJECT_CODE); + taskDefinition.setCode(TASK_CODE); + taskDefinition.setVersion(VERSION); + taskDefinition.setTaskType("SHELL"); + taskDefinition.setTaskParams(TASK_PARAMETER); + taskDefinition.setFlag(Flag.YES); + taskDefinition.setCpuQuota(RESOURCE_RATE); + taskDefinition.setMemoryMax(RESOURCE_RATE); + return taskDefinition; + } + + private List getProcessTaskRelationList() { + List processTaskRelationList = new ArrayList<>(); + + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProjectCode(PROJECT_CODE); + processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE); + processTaskRelation.setPreTaskCode(TASK_CODE); + processTaskRelation.setPostTaskCode(TASK_CODE + 1L); + + processTaskRelationList.add(processTaskRelation); + return processTaskRelationList; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java index fc993c3add..ec5fba8403 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java @@ -42,7 +42,6 @@ public class ProcessDefinitionLog extends ProcessDefinition { } public ProcessDefinitionLog(ProcessDefinition processDefinition) { - this.setId(processDefinition.getId()); this.setCode(processDefinition.getCode()); this.setName(processDefinition.getName()); this.setVersion(processDefinition.getVersion()); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java index bef2630d8f..09f5cf65cb 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java @@ -113,9 +113,7 @@ public class ProcessTaskRelation { long postTaskCode, int postTaskVersion, ConditionType conditionType, - String conditionParams, - Date createTime, - Date updateTime) { + String conditionParams) { this.name = name; this.processDefinitionVersion = processDefinitionVersion; this.projectCode = projectCode; @@ -126,8 +124,26 @@ public class ProcessTaskRelation { this.postTaskVersion = postTaskVersion; this.conditionType = conditionType; this.conditionParams = conditionParams; - this.createTime = createTime; - this.updateTime = updateTime; + + Date now = new Date(); + this.createTime = now; + this.updateTime = now; + } + + public ProcessTaskRelation(ProcessTaskRelationLog processTaskRelationLog) { + this.name = processTaskRelationLog.getName(); + this.processDefinitionVersion = processTaskRelationLog.getProcessDefinitionVersion(); + this.projectCode = processTaskRelationLog.getProjectCode(); + this.processDefinitionCode = processTaskRelationLog.getProcessDefinitionCode(); + this.preTaskCode = processTaskRelationLog.getPreTaskCode(); + this.preTaskVersion = processTaskRelationLog.getPreTaskVersion(); + this.postTaskCode = processTaskRelationLog.getPostTaskCode(); + this.postTaskVersion = processTaskRelationLog.getPostTaskVersion(); + this.conditionType = processTaskRelationLog.getConditionType(); + this.conditionParams = processTaskRelationLog.getConditionParams(); + + this.createTime = processTaskRelationLog.getCreateTime(); + this.updateTime = new Date(); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java index 3bc3305d0c..f098457bc3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java @@ -43,7 +43,6 @@ public class ProcessTaskRelationLog extends ProcessTaskRelation { public ProcessTaskRelationLog(ProcessTaskRelation processTaskRelation) { super(); - this.setId(processTaskRelation.getId()); this.setName(processTaskRelation.getName()); this.setProcessDefinitionCode(processTaskRelation.getProcessDefinitionCode()); this.setProcessDefinitionVersion(processTaskRelation.getProcessDefinitionVersion()); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java index f070fdb165..3701956612 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java @@ -43,7 +43,6 @@ public class TaskDefinitionLog extends TaskDefinition { public TaskDefinitionLog(TaskDefinition taskDefinition) { super(); - this.setId(taskDefinition.getId()); this.setCode(taskDefinition.getCode()); this.setVersion(taskDefinition.getVersion()); this.setName(taskDefinition.getName()); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index 4332b54960..4549cc15d0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -30,6 +30,7 @@ import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; /** * process task relation mapper interface @@ -86,7 +87,7 @@ public interface ProcessTaskRelationMapper extends BaseMapper taskRelationList); + int batchInsert(@Param("taskRelationList") List taskRelationList); /** * query downstream process task relation by taskCode @@ -103,7 +104,8 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryUpstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode); + List queryUpstreamByCode(@Param("projectCode") long projectCode, + @Param("taskCode") long taskCode); /** * query downstream process task relation by taskCode @@ -112,7 +114,8 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode); + List queryDownstreamByCode(@Param("projectCode") long projectCode, + @Param("taskCode") long taskCode); /** * query task relation by codes @@ -122,7 +125,9 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryUpstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode, @Param("preTaskCodes") Long[] preTaskCodes); + List queryUpstreamByCodes(@Param("projectCode") long projectCode, + @Param("taskCode") long taskCode, + @Param("preTaskCodes") Long[] preTaskCodes); /** * query process task relation by process definition code @@ -197,5 +202,13 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryDownstreamByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); - + /** + * Filter process task relation + * + * @param page page + * @param processTaskRelation process definition object + * @return process task relation IPage + */ + IPage filterProcessTaskRelation(IPage page, + @Param("relation") ProcessTaskRelation processTaskRelation); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java index 4940b4e1d2..eb3d1f7e40 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java @@ -122,7 +122,7 @@ public interface TaskDefinitionMapper extends BaseMapper { @Param("searchWorkflowName") String searchWorkflowName, @Param("searchTaskName") String searchTaskName, @Param("taskType") String taskType, - @Param("taskExecuteType")TaskExecuteType taskExecuteType); + @Param("taskExecuteType") TaskExecuteType taskExecuteType); /** * query task definition by code list @@ -131,4 +131,14 @@ public interface TaskDefinitionMapper extends BaseMapper { * @return task definition list */ List queryByCodeList(@Param("codes") Collection codes); + + /** + * Filter task definition + * + * @param page page + * @param taskDefinition process definition object + * @return task definition IPage + */ + IPage filterTaskDefinition(IPage page, + @Param("task") TaskDefinition taskDefinition); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml index 2a1892db2b..9bcabf4866 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml @@ -200,4 +200,27 @@ and post_task_code = #{postTaskCode} + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index a8a09f7271..9c7d6a7b59 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -135,4 +135,24 @@ + diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 57e16d2250..d25bd083fe 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -2448,16 +2448,17 @@ public class ProcessServiceImpl implements ProcessService { if (!processTaskRelationList.isEmpty()) { processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode()); } - List processTaskRelationLogList = processTaskRelationLogMapper - .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); - int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelationLogList); + List processTaskRelationListFromLog = processTaskRelationLogMapper + .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()).stream() + .map(ProcessTaskRelation::new).collect(Collectors.toList()); + int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelationListFromLog); if (batchInsert == 0) { return Constants.EXIT_CODE_FAILURE; } else { int result = 0; - for (ProcessTaskRelationLog taskRelationLog : processTaskRelationLogList) { - int switchResult = switchTaskDefinitionVersion(taskRelationLog.getPostTaskCode(), - taskRelationLog.getPostTaskVersion()); + for (ProcessTaskRelation taskRelation : processTaskRelationListFromLog) { + int switchResult = switchTaskDefinitionVersion(taskRelation.getPostTaskCode(), + taskRelation.getPostTaskVersion()); if (switchResult != Constants.EXIT_CODE_FAILURE) { result++; } @@ -2673,7 +2674,9 @@ public class ProcessServiceImpl implements ProcessService { } processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode); } - insert = processTaskRelationMapper.batchInsert(taskRelationList); + List processTaskRelations = + taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList()); + insert = processTaskRelationMapper.batchInsert(processTaskRelations); } int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList); return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;