Browse Source

[Feature][API] New restful API task and task relation (#12116)

3.2.0-release
Jiajie Zhong 2 years ago committed by GitHub
parent
commit
0e1c8d8153
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 132
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ScheduleV2Controller.java
  3. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
  4. 170
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionV2Controller.java
  5. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowV2Controller.java
  6. 141
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java
  7. 62
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskFilterRequest.java
  8. 175
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java
  9. 58
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationCreateRequest.java
  10. 51
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationDeleteRequest.java
  11. 73
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationFilterRequest.java
  12. 54
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationUpdateUpstreamRequest.java
  13. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java
  14. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  15. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
  16. 56
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
  17. 245
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  18. 331
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  19. 251
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
  20. 456
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  21. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
  22. 26
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
  23. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
  24. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
  25. 23
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  26. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  27. 23
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
  28. 20
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  29. 17
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

132
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_PROCESS_TASK_RELATION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_PROCESS_RELATION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_UPSTREAM_TASK_PROCESS_RELATION_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationCreateRequest;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationDeleteRequest;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.User;
import springfox.documentation.annotations.ApiIgnore;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
/**
* process task relation controller
*/
@Api(tags = "PROCESS_TASK_RELATION_TAG")
@RestController
@RequestMapping("v2/relations")
public class ProcessTaskRelationV2Controller extends BaseController {
@Autowired
private ProcessTaskRelationService processTaskRelationService;
/**
* create resource process task relation
*
* @param loginUser login user
* @param TaskRelationCreateRequest process task definition json contains the object you want to create
* @return Result object created
*/
@ApiOperation(value = "create", notes = "CREATE_PROCESS_TASK_RELATION_NOTES")
@PostMapping(consumes = {"application/json"})
@ResponseStatus(HttpStatus.CREATED)
@ApiException(CREATE_PROCESS_TASK_RELATION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<ProcessTaskRelation> createTaskRelation(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestBody TaskRelationCreateRequest TaskRelationCreateRequest) {
ProcessTaskRelation processTaskRelation =
processTaskRelationService.createProcessTaskRelationV2(loginUser, TaskRelationCreateRequest);
return Result.success(processTaskRelation);
}
/**
* delete resource process task relation
*
* @param loginUser login user
* @param codePair code pair you want to delete the task relation, use `upstream,downstream` as example, will delete exists relation upstream -> downstream, throw error if not exists
* @return delete result code
*/
@ApiOperation(value = "delete", notes = "DELETE_PROCESS_TASK_RELATION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "code-pair", value = "TASK_DEFINITION_CODE", dataTypeClass = long.class, example = "123456,78901", required = true)
})
@DeleteMapping(value = "/{code-pair}")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_TASK_PROCESS_RELATION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteTaskRelation(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("code-pair") String codePair) {
TaskRelationDeleteRequest taskRelationDeleteRequest = new TaskRelationDeleteRequest(codePair);
processTaskRelationService.deleteTaskProcessRelationV2(loginUser, taskRelationDeleteRequest.getUpstreamCode(),
taskRelationDeleteRequest.getDownstreamCode());
return Result.success();
}
/**
* Update resource task relation by code, only update this code's upstreams
*
* @param loginUser login user
* @param code resource task code want to update its upstreams
* @param taskRelationUpdateUpstreamRequest workflowUpdateRequest
* @return ResourceResponse object updated
*/
@ApiOperation(value = "update", notes = "UPDATE_PROCESS_TASK_RELATION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "code", value = "DOWMSTREAM_TASK_DEFINITION_CODE", dataTypeClass = long.class, example = "123456", required = true)
})
@PutMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_UPSTREAM_TASK_PROCESS_RELATION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<List<ProcessTaskRelation>> updateUpstreamTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("code") Long code,
@RequestBody TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
List<ProcessTaskRelation> processTaskRelations = processTaskRelationService
.updateUpstreamTaskDefinition(loginUser, code, taskRelationUpdateUpstreamRequest);
return Result.success(processTaskRelations);
}
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ScheduleV2Controller.java

@ -150,7 +150,7 @@ public class ScheduleV2Controller extends BaseController {
* @return result Result
*/
@ApiOperation(value = "get", notes = "QUERY_SCHEDULE_LIST_PAGING_NOTES")
@GetMapping(consumes = {"application/json"})
@PostMapping(value = "/filter", consumes = {"application/json"})
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_SCHEDULE_LIST_PAGING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java

@ -29,6 +29,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.SWITCH_TASK_DEFINITIO
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_DEFINITION_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.Result;
@ -294,8 +295,8 @@ public class TaskDefinitionController extends BaseController {
public Result deleteTaskDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "code") long code) {
Map<String, Object> result = taskDefinitionService.deleteTaskDefinitionByCode(loginUser, projectCode, code);
return returnDataList(result);
taskDefinitionService.deleteTaskDefinitionByCode(loginUser, code);
return new Result(Status.SUCCESS);
}
/**

170
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionV2Controller.java

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_SCHEDULE_BY_ID_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_TASK_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_DEFINITION_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import springfox.documentation.annotations.ApiIgnore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
/**
* task definition controller
*/
@Api(tags = "TASK_DEFINITION_TAG")
@RestController
@RequestMapping("v2/tasks")
public class TaskDefinitionV2Controller extends BaseController {
@Autowired
private TaskDefinitionService taskDefinitionService;
/**
* Create resource task definition
*
* @param loginUser login user
* @param taskCreateRequest task definition json
* @return Result object created
*/
@ApiOperation(value = "create", notes = "CREATE_TASK_DEFINITION_NOTES")
@PostMapping(consumes = {"application/json"})
@ResponseStatus(HttpStatus.CREATED)
@ApiException(CREATE_TASK_DEFINITION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<TaskDefinition> createTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestBody TaskCreateRequest taskCreateRequest) {
TaskDefinition taskDefinition = taskDefinitionService.createTaskDefinitionV2(loginUser, taskCreateRequest);
return Result.success(taskDefinition);
}
/**
* Delete resource task definition by code
*
* @param loginUser login user
* @param code task definition code
*/
@ApiOperation(value = "delete", notes = "DELETE_TASK_DEFINITION_VERSION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", dataTypeClass = long.class, example = "123456", required = true)
})
@DeleteMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_SCHEDULE_BY_ID_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("code") Long code) {
taskDefinitionService.deleteTaskDefinitionByCode(loginUser, code);
return Result.success();
}
/**
* Update resource task definition by code
*
* @param loginUser login user
* @param code task code of resource you want to update
* @param taskUpdateRequest workflowUpdateRequest
* @return ResourceResponse object updated
*/
@ApiOperation(value = "update", notes = "UPDATE_TASK_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", dataTypeClass = long.class, example = "123456", required = true)
})
@PutMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_TASK_DEFINITION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<TaskDefinition> updateTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("code") Long code,
@RequestBody TaskUpdateRequest taskUpdateRequest) {
TaskDefinition taskDefinition =
taskDefinitionService.updateTaskDefinitionV2(loginUser, code, taskUpdateRequest);
return Result.success(taskDefinition);
}
/**
* Get resource task definition by code
*
* @param loginUser login user
* @param code task code of resource you want to update
* @return ResourceResponse object get from condition
*/
@ApiOperation(value = "get", notes = "GET_TASK_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", dataTypeClass = long.class, example = "123456", required = true)
})
@GetMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_DETAIL_OF_TASK_DEFINITION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<TaskDefinition> getTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("code") Long code) {
TaskDefinition taskDefinition = taskDefinitionService.getTaskDefinition(loginUser, code);
return Result.success(taskDefinition);
}
/**
* Get resource task definition according to query parameter
*
* @param loginUser login user
* @param taskFilterRequest workflowFilterRequest
* @return PageResourceResponse from condition
*/
@ApiOperation(value = "get", notes = "FILTER_TASK_DEFINITION_NOTES")
@PostMapping(value = "/query", consumes = {"application/json"})
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_PROCESS_DEFINITION_LIST)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<PageInfo<TaskDefinition>> filterTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestBody TaskFilterRequest taskFilterRequest) {
PageInfo<TaskDefinition> taskDefinitions =
taskDefinitionService.filterTaskDefinition(loginUser, taskFilterRequest);
return Result.success(taskDefinitions);
}
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowV2Controller.java

@ -152,7 +152,7 @@ public class WorkflowV2Controller extends BaseController {
* @return PageResourceResponse from condition
*/
@ApiOperation(value = "get", notes = "FILTER_WORKFLOWS_NOTES")
@GetMapping(consumes = {"application/json"})
@PostMapping(value = "/query", consumes = {"application/json"})
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_PROCESS_DEFINITION_LIST)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")

141
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.task;
import static org.apache.dolphinscheduler.common.Constants.VERSION_FIRST;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import java.util.Date;
import lombok.Data;
import io.swagger.annotations.ApiModelProperty;
/**
* task create request
*/
@Data
public class TaskCreateRequest {
@ApiModelProperty(example = "workflow-code", required = true)
private long workflowCode;
@ApiModelProperty(example = "task-name", required = true)
private String name;
@ApiModelProperty(example = "describe what this task actual do", required = true)
private String description;
@ApiModelProperty(example = "6816095515584", dataType = "Long")
private long projectCode;
@ApiModelProperty(example = "SHELL", required = true)
private String taskType;
// todo
@ApiModelProperty(example = "{\"localParams\": [], \"rawScript\": \"echo 1\", \"resourceList\": []}", required = true, notes = "task definition params")
private String taskParams;
@ApiModelProperty(example = "YES", allowableValues = "YES,NO", notes = "default YES is not provided")
private String flag;
@ApiModelProperty(example = "MEDIUM", allowableValues = "HIGHEST,HIGH,MEDIUM,LOW,LOWEST", notes = "default MEDIUM is not provided")
private String taskPriority;
@ApiModelProperty(example = "default", notes = "default 'default' if not provided")
private String workerGroup;
@ApiModelProperty(example = "6563415109312", dataType = "Long")
private long environmentCode;
@ApiModelProperty(example = "0", dataType = "Integer", notes = "default 0 not provided")
private int failRetryTimes;
@ApiModelProperty(example = "1")
private int failRetryInterval;
@ApiModelProperty(example = "SHELL")
private int timeout;
@ApiModelProperty(example = "CLOSE", allowableValues = "CLOSE,OPEN", notes = "default CLOSE is not provided")
private String timeoutFlag;
@ApiModelProperty(example = "MEDIUM", allowableValues = "WARN,FAILED,WARNFAILED", notes = "default MEDIUM is not provided")
private String timeoutNotifyStrategy;
@ApiModelProperty(example = "1,2,3")
private String resourceIds;
@ApiModelProperty(example = "2")
private int taskGroupId;
@ApiModelProperty(example = "1", dataType = "Integer", notes = "A priority number for execute task, the bigger the high priority, default null if not provided")
private int taskGroupPriority;
@ApiModelProperty(example = "0.1", dataType = "Float", notes = "default unlimited if not provided")
private Integer cpuQuota;
@ApiModelProperty(example = "0.1", dataType = "Float", notes = "default unlimited if not provided")
private Integer memoryMax;
@ApiModelProperty(example = "upstream-task-codes1,upstream-task-codes2", notes = "use , to split multiple upstream task codes")
private String upstreamTasksCodes;
public TaskDefinition convert2TaskDefinition() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setName(this.name);
taskDefinition.setDescription(this.description);
taskDefinition.setProjectCode(this.projectCode);
taskDefinition.setTaskType(this.taskType);
taskDefinition.setTaskParams(this.taskParams);
taskDefinition.setWorkerGroup(this.workerGroup == null ? Constants.DEFAULT_WORKER_GROUP : this.workerGroup);
taskDefinition.setEnvironmentCode(this.environmentCode);
taskDefinition.setFailRetryTimes(this.failRetryTimes);
taskDefinition.setFailRetryInterval(this.failRetryInterval);
taskDefinition.setTimeout(this.timeout);
taskDefinition.setResourceIds(this.resourceIds);
taskDefinition.setTaskGroupId(this.taskGroupId);
taskDefinition.setTaskGroupPriority(this.taskGroupPriority);
taskDefinition.setCpuQuota(this.cpuQuota);
taskDefinition.setMemoryMax(this.memoryMax);
Flag flagCreate = this.flag == null ? Flag.YES : Flag.valueOf(this.flag);
taskDefinition.setFlag(flagCreate);
TimeoutFlag timeoutFlagCreate =
this.timeoutFlag == null ? TimeoutFlag.CLOSE : TimeoutFlag.valueOf(this.timeoutFlag);
taskDefinition.setTimeoutFlag(timeoutFlagCreate);
Priority taskPriorityCreate = this.taskPriority == null ? Priority.MEDIUM : Priority.valueOf(this.taskPriority);
taskDefinition.setTaskPriority(taskPriorityCreate);
TaskTimeoutStrategy taskTimeoutStrategyCreate = this.timeoutNotifyStrategy == null ? TaskTimeoutStrategy.WARN
: TaskTimeoutStrategy.valueOf(this.timeoutNotifyStrategy);
taskDefinition.setTimeoutNotifyStrategy(taskTimeoutStrategyCreate);
taskDefinition.setVersion(VERSION_FIRST);
Date date = new Date();
taskDefinition.setCreateTime(date);
taskDefinition.setUpdateTime(date);
return taskDefinition;
}
}

62
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskFilterRequest.java

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.task;
import org.apache.dolphinscheduler.api.dto.PageQueryDto;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
* task query request
*/
@ApiModel("TASK-QUERY")
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
public class TaskFilterRequest extends PageQueryDto {
@ApiModelProperty(example = "project-name")
private String projectName;
@ApiModelProperty(example = "task-name")
private String name;
@ApiModelProperty(example = "SHELL")
private String taskType;
public TaskDefinition convert2TaskDefinition() {
TaskDefinition taskDefinition = new TaskDefinition();
if (this.projectName != null) {
taskDefinition.setProjectName(this.projectName);
}
if (this.name != null) {
taskDefinition.setName(this.name);
}
if (this.taskType != null) {
taskDefinition.setTaskType(this.taskType);
}
return taskDefinition;
}
}

175
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.task;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.commons.beanutils.BeanUtils;
import java.lang.reflect.InvocationTargetException;
import java.util.Date;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.swagger.annotations.ApiModelProperty;
/**
* task update request
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
public class TaskUpdateRequest {
@ApiModelProperty(example = "workflow-code", required = true)
private long workflowCode;
@ApiModelProperty(example = "task-name")
private String name;
@ApiModelProperty(example = "describe what this task actual do")
private String description;
@ApiModelProperty(example = "SHELL")
private String taskType;
// todo
@ApiModelProperty(example = "{\"localParams\": [], \"rawScript\": \"echo 1\", \"resourceList\": []}", notes = "task definition params")
private String taskParams;
@ApiModelProperty(example = "YES", allowableValues = "YES,NO", notes = "default YES is not provided")
private String flag;
@ApiModelProperty(example = "MEDIUM", allowableValues = "HIGHEST,HIGH,MEDIUM,LOW,LOWEST", notes = "default MEDIUM is not provided")
private String taskPriority;
@ApiModelProperty(example = "default", notes = "default 'default' if not provided")
private String workerGroup;
@ApiModelProperty(example = "6563415109312", dataType = "Long")
private long environmentCode;
@ApiModelProperty(example = "0", dataType = "Integer", notes = "default 0 not provided")
private int failRetryTimes;
@ApiModelProperty(example = "1")
private int failRetryInterval;
@ApiModelProperty(example = "SHELL")
private int timeout;
@ApiModelProperty(example = "CLOSE", allowableValues = "CLOSE,OPEN", notes = "default CLOSE is not provided")
private String timeoutFlag;
@ApiModelProperty(example = "MEDIUM", allowableValues = "WARN,FAILED,WARNFAILED", notes = "default MEDIUM is not provided")
private String timeoutNotifyStrategy;
@ApiModelProperty(example = "1,2,3")
private String resourceIds;
@ApiModelProperty(example = "2")
private int taskGroupId;
@ApiModelProperty(example = "1", dataType = "Integer", notes = "A priority number for execute task, the bigger the high priority, default null if not provided")
private int taskGroupPriority;
@ApiModelProperty(example = "0.1", dataType = "Float", notes = "default unlimited if not provided")
private Integer cpuQuota;
@ApiModelProperty(example = "0.1", dataType = "Float", notes = "default unlimited if not provided")
private Integer memoryMax;
@ApiModelProperty(example = "upstream-task-codes1,upstream-task-codes2", notes = "use , to split multiple upstream task codes")
private String upstreamTasksCodes;
/**
* Merge taskUpdateRequest information into exists task definition object
*
* @param taskDefinition exists task definition object
* @return task definition
*/
public TaskDefinition mergeIntoTaskDefinition(TaskDefinition taskDefinition) throws InvocationTargetException, IllegalAccessException, InstantiationException, NoSuchMethodException {
TaskDefinition taskDefinitionDeepCopy = (TaskDefinition) BeanUtils.cloneBean(taskDefinition);
assert taskDefinitionDeepCopy != null;
if (this.name != null) {
taskDefinitionDeepCopy.setName(this.name);
}
if (this.description != null) {
taskDefinitionDeepCopy.setDescription(this.description);
}
if (this.taskType != null) {
taskDefinitionDeepCopy.setTaskType(this.taskType);
}
if (this.taskParams != null) {
taskDefinitionDeepCopy.setTaskParams(this.taskParams);
}
if (this.flag != null) {
taskDefinitionDeepCopy.setFlag(Flag.valueOf(this.flag));
}
if (this.taskPriority != null) {
taskDefinitionDeepCopy.setTaskPriority(Priority.valueOf(this.taskPriority));
}
if (this.workerGroup != null) {
taskDefinitionDeepCopy.setWorkerGroup(this.workerGroup);
}
if (this.environmentCode != 0L) {
taskDefinitionDeepCopy.setEnvironmentCode(this.environmentCode);
}
if (this.failRetryTimes != 0) {
taskDefinitionDeepCopy.setFailRetryTimes(this.failRetryTimes);
}
if (this.failRetryInterval != 0) {
taskDefinitionDeepCopy.setFailRetryInterval(this.failRetryInterval);
}
if (this.timeout != 0) {
taskDefinitionDeepCopy.setTimeout(this.timeout);
}
if (this.timeoutFlag != null) {
taskDefinitionDeepCopy.setTimeoutFlag(TimeoutFlag.valueOf(this.timeoutFlag));
}
if (this.timeoutNotifyStrategy != null) {
taskDefinitionDeepCopy.setTimeoutNotifyStrategy(TaskTimeoutStrategy.valueOf(this.timeoutNotifyStrategy));
}
if (this.resourceIds != null) {
taskDefinitionDeepCopy.setResourceIds(this.resourceIds);
}
if (this.taskGroupId != 0) {
taskDefinitionDeepCopy.setTaskGroupId(this.taskGroupId);
}
if (this.taskGroupPriority != 0) {
taskDefinitionDeepCopy.setTaskGroupPriority(this.taskGroupPriority);
}
if (this.cpuQuota != null) {
taskDefinitionDeepCopy.setCpuQuota(this.cpuQuota);
}
if (this.memoryMax != null) {
taskDefinitionDeepCopy.setMemoryMax(this.memoryMax);
}
if (!taskDefinition.equals(taskDefinitionDeepCopy)) {
int version = taskDefinitionDeepCopy.getVersion() + 1;
taskDefinitionDeepCopy.setVersion(version);
taskDefinitionDeepCopy.setUpdateTime(new Date());
}
return taskDefinitionDeepCopy;
}
}

58
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationCreateRequest.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.taskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import java.util.Date;
import lombok.Data;
import io.swagger.annotations.ApiModelProperty;
/**
* task relation create request
*/
@Data
public class TaskRelationCreateRequest {
@ApiModelProperty(example = "12345678")
private long projectCode;
@ApiModelProperty(example = "87654321", required = true)
private long workflowCode;
@ApiModelProperty(example = "12345", required = true)
private long preTaskCode;
@ApiModelProperty(example = "54321", required = true)
private long postTaskCode;
public ProcessTaskRelation convert2ProcessTaskRelation() {
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(this.projectCode);
processTaskRelation.setProcessDefinitionCode(this.workflowCode);
processTaskRelation.setPreTaskCode(this.preTaskCode);
processTaskRelation.setPostTaskCode(this.postTaskCode);
Date date = new Date();
processTaskRelation.setCreateTime(date);
processTaskRelation.setUpdateTime(date);
return processTaskRelation;
}
}

51
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationDeleteRequest.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.taskRelation;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import java.util.stream.Stream;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.swagger.annotations.ApiModelProperty;
/**
* task relation want to delete request
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
public class TaskRelationDeleteRequest {
private long upstreamCode;
private long downstreamCode;
@ApiModelProperty(example = "12345678,87654321", required = true, notes = "relation pair want to delete relation, separated by comma")
private String codePair;
public TaskRelationDeleteRequest(String relationPair) {
long[] relations = Stream.of(relationPair.split(COMMA))
.map(String::trim)
.mapToLong(Long::parseLong)
.toArray();
this.upstreamCode = relations[0];
this.downstreamCode = relations[1];
}
}

73
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationFilterRequest.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.taskRelation;
import org.apache.dolphinscheduler.api.dto.PageQueryDto;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
* task relation query request
*/
@ApiModel("TASK-RELATION-QUERY")
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
public class TaskRelationFilterRequest extends PageQueryDto {
@ApiModelProperty(example = "1234567890123")
private long workflowCode;
@ApiModelProperty(example = "1234567890123")
private long preTaskCode;
@ApiModelProperty(example = "1234567890123")
private long postTaskCode;
public TaskRelationFilterRequest(long workflowCode, long preTaskCode, long postTaskCode) {
this.workflowCode = workflowCode;
this.preTaskCode = preTaskCode;
this.postTaskCode = postTaskCode;
}
public TaskRelationFilterRequest(long preTaskCode, long postTaskCode) {
this.preTaskCode = preTaskCode;
this.postTaskCode = postTaskCode;
}
public ProcessTaskRelation convert2TaskDefinition() {
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
if (this.workflowCode != 0L) {
processTaskRelation.setProcessDefinitionCode(this.workflowCode);
}
if (this.preTaskCode != 0L) {
processTaskRelation.setPreTaskCode(this.preTaskCode);
}
if (this.postTaskCode != 0L) {
processTaskRelation.setPostTaskCode(this.postTaskCode);
}
return processTaskRelation;
}
}

54
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskRelation/TaskRelationUpdateUpstreamRequest.java

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.taskRelation;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import org.apache.dolphinscheduler.api.dto.PageQueryDto;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.swagger.annotations.ApiModelProperty;
/**
* task relation update request
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
public class TaskRelationUpdateUpstreamRequest extends PageQueryDto {
@ApiModelProperty(example = "1234587654321", notes = "workflow code ")
private long workflowCode;
@ApiModelProperty(example = "12345678,87654321", required = true, notes = "upstream you want to update separated by comma")
private String upstreams;
public List<Long> getUpstreams() {
return Stream.of(this.upstreams.split(COMMA))
.map(String::trim)
.map(Long::parseLong)
.collect(Collectors.toList());
}
}

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java

@ -62,6 +62,15 @@ public class WorkflowUpdateRequest {
@ApiModelProperty(allowableValues = "PARALLEL / SERIAL_WAIT / SERIAL_DISCARD / SERIAL_PRIORITY", example = "PARALLEL", notes = "default PARALLEL if not provide.")
private String executionType;
@ApiModelProperty(example = "[{\\\"taskCode\\\":7009653961024,\\\"x\\\":312,\\\"y\\\":196}]")
private String location;
/**
* Merge workflowUpdateRequest information into exists processDefinition object
*
* @param processDefinition exists processDefinition object
* @return process definition
*/
public ProcessDefinition mergeIntoProcessDefinition(ProcessDefinition processDefinition) {
ProcessDefinition processDefinitionDeepCopy =
JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
@ -90,6 +99,9 @@ public class WorkflowUpdateRequest {
if (this.executionType != null) {
processDefinitionDeepCopy.setExecutionType(ProcessExecutionTypeEnum.valueOf(this.executionType));
}
if (this.location != null) {
processDefinitionDeepCopy.setLocations(this.location);
}
int version = processDefinitionDeepCopy.getVersion() + 1;
processDefinitionDeepCopy.setVersion(version);

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -357,6 +357,19 @@ public enum Status {
BATCH_EXECUTE_PROCESS_INSTANCE_ERROR(50058, "change process instance status error: {0}", "修改工作实例状态错误: {0}"),
START_TASK_INSTANCE_ERROR(50059, "start task instance error", "运行任务流实例错误"),
DELETE_PROCESS_DEFINE_ERROR(50060, "delete process definition [{0}] error: {1}", "删除工作流定义[{0}]错误: {1}"),
CREATE_TASK_DEFINITION_LOG_ERROR(50061, "create task definition log {0} error", "创建任务操作记录 {0} 错误"),
DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR(50062, "delete task definition {0} error", "删除任务定义 {0} 错误"),
TASK_DEFINITION_NOT_CHANGE(50063, "task definition {0} do not change", "任务定义 {0} 没有变化"),
TASK_DEFINITION_NOT_EXISTS(50064, "task definition {0} do not exists", "任务定义 {0} 不存在"),
UPDATE_UPSTREAM_TASK_PROCESS_RELATION_ERROR(50065, "update task upstream relation error", "更新任务上游关系错误"),
CREATE_PROCESS_TASK_RELATION_LOG_ERROR(50066, "create process task relation log {0}-{1} error",
"创建任务关系日志 {0}-{1} 错误"),
PROCESS_TASK_RELATION_NOT_EXPECT(50067, "process task relation number not expect, expect {0} but get {1}",
"工作流任务关系数量不符合预期,预期 {0} 但是实际 {1}"),
PROCESS_TASK_RELATION_BATCH_DELETE_ERROR(50068, "batch delete process task relation {0} error",
"批量删除工作流任务关系 {0} 错误"),
PROCESS_TASK_RELATION_BATCH_CREATE_ERROR(50069, "batch create process task relation {0} error",
"批量创建工作流任务关系 {0} 错误"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java

@ -17,8 +17,12 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationCreateRequest;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
import java.util.Map;
/**
@ -42,6 +46,16 @@ public interface ProcessTaskRelationService {
long preTaskCode,
long postTaskCode);
/**
* create resource process task relation
*
* @param loginUser login user
* @param taskRelationCreateRequest project code
* @return ProcessTaskRelation object
*/
ProcessTaskRelation createProcessTaskRelationV2(User loginUser,
TaskRelationCreateRequest taskRelationCreateRequest);
/**
* delete process task relation
*
@ -56,6 +70,28 @@ public interface ProcessTaskRelationService {
long processDefinitionCode,
long taskCode);
/**
* delete process task relation, will delete exists relation preTaskCode -> postTaskCode, throw error if not exists
*
* @param loginUser login user
* @param preTaskCode relation upstream code
* @param postTaskCode relation downstream code
*/
void deleteTaskProcessRelationV2(User loginUser,
long preTaskCode,
long postTaskCode);
/**
* delete process task relation, will delete exists relation upstream -> downstream, throw error if not exists
*
* @param loginUser login user
* @param taskCode relation upstream code
* @param taskRelationUpdateUpstreamRequest relation downstream code
*/
List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
long taskCode,
TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest);
/**
* delete task upstream relation
*
@ -118,5 +154,6 @@ public interface ProcessTaskRelationService {
* @param postTaskCode post task code
* @return delete result code
*/
Map<String, Object> deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode);
Map<String, Object> deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode,
long postTaskCode);
}

56
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java

@ -17,9 +17,14 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map;
@ -40,6 +45,16 @@ public interface TaskDefinitionService {
long projectCode,
String taskDefinitionJson);
/**
* Create resource task definition
*
* @param loginUser login user
* @param taskCreateRequest task definition json
* @Return new TaskDefinition have created
*/
TaskDefinition createTaskDefinitionV2(User loginUser,
TaskCreateRequest taskCreateRequest);
/**
* create single task definition that binds the workflow
*
@ -70,14 +85,15 @@ public interface TaskDefinitionService {
String taskName);
/**
* delete task definition
* Delete resource task definition by code
*
* Only task release state offline and no downstream tasks can be deleted, will also remove the exists
* task relation [upstreamTaskCode, taskCode]
*
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
*/
Map<String, Object> deleteTaskDefinitionByCode(User loginUser,
long projectCode,
void deleteTaskDefinitionByCode(User loginUser,
long taskCode);
/**
@ -93,6 +109,38 @@ public interface TaskDefinitionService {
long taskCode,
String taskDefinitionJsonObj);
/**
* Update resource task definition by code
*
* @param loginUser login user
* @param taskCode task code
* @param taskUpdateRequest task definition json object
* @return new TaskDefinition have updated
*/
TaskDefinition updateTaskDefinitionV2(User loginUser,
long taskCode,
TaskUpdateRequest taskUpdateRequest);
/**
* Get resource task definition by code
*
* @param loginUser login user
* @param taskCode task code
* @return TaskDefinition
*/
TaskDefinition getTaskDefinition(User loginUser,
long taskCode);
/**
* Get resource task definition according to query parameter
*
* @param loginUser login user
* @param taskFilterRequest taskFilterRequest
* @return PageResourceResponse from condition
*/
PageInfo<TaskDefinition> filterTaskDefinition(User loginUser,
TaskFilterRequest taskFilterRequest);
/**
* update task definition and upstream
*

245
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -21,6 +21,9 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationCreateRequest;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationFilterRequest;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
@ -35,6 +38,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@ -59,6 +63,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
/**
@ -90,6 +96,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
@Autowired
private ProcessService processService;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
/**
* create process task relation
*
@ -165,6 +174,85 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
private ProcessTaskRelationLog persist2ProcessTaskRelationLog(User user, ProcessTaskRelation processTaskRelation) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(user.getId());
processTaskRelationLog.setOperateTime(new Date());
int result = processTaskRelationLogMapper.insert(processTaskRelationLog);
if (result <= 0) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR,
processTaskRelationLog.getPreTaskCode(), processTaskRelationLog.getPostTaskCode());
}
return processTaskRelationLog;
}
private List<ProcessTaskRelationLog> batchPersist2ProcessTaskRelationLog(User user,
List<ProcessTaskRelation> processTaskRelations) {
Date now = new Date();
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(user.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLogs.add(processTaskRelationLog);
}
int result = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs);
if (result != processTaskRelationLogs.size()) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR);
}
return processTaskRelationLogs;
}
private void updateVersions(ProcessTaskRelation processTaskRelation) {
// workflow
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processTaskRelation.getProcessDefinitionCode());
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
// tasks
TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(processTaskRelation.getPreTaskCode());
processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion());
TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(processTaskRelation.getPostTaskCode());
processTaskRelation.setPostTaskVersion(postTaskDefinition.getVersion());
}
/**
* create resource process task relation
*
* @param loginUser login user
* @param taskRelationCreateRequest project code
* @return ProcessTaskRelation object
*/
@Override
@Transactional
public ProcessTaskRelation createProcessTaskRelationV2(User loginUser,
TaskRelationCreateRequest taskRelationCreateRequest) {
ProcessTaskRelation processTaskRelation = taskRelationCreateRequest.convert2ProcessTaskRelation();
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processTaskRelation.getProcessDefinitionCode());
if (processDefinition == null) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(processTaskRelation.getProcessDefinitionCode()));
}
if (processTaskRelation.getProjectCode() == 0) {
processTaskRelation.setProjectCode(processDefinition.getProjectCode());
}
Project project = projectMapper.queryByCode(processTaskRelation.getProjectCode());
projectService.checkProjectAndAuthThrowException(loginUser, project, null);
// persistence process task relation and process task relation log to database
this.updateVersions(processTaskRelation);
int insert = processTaskRelationMapper.insert(processTaskRelation);
if (insert <= 0) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR, processTaskRelation.getPreTaskCode(),
processTaskRelation.getPostTaskCode());
}
this.persist2ProcessTaskRelationLog(loginUser, processTaskRelation);
return processTaskRelation;
}
private ProcessTaskRelation setRelation(ProcessDefinition processDefinition, TaskDefinition taskDefinition) {
Date now = new Date();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
@ -184,11 +272,13 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
ProcessDefinition processDefinition) {
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
logger.error("Update process definition error, projectCode:{}, processDefinitionCode:{}.", processDefinition.getProjectCode(), processDefinition.getCode());
logger.error("Update process definition error, projectCode:{}, processDefinitionCode:{}.",
processDefinition.getProjectCode(), processDefinition.getCode());
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} else
logger.info("Update process definition complete, new version is {}, projectCode:{}, processDefinitionCode:{}.",
logger.info(
"Update process definition complete, new version is {}, projectCode:{}, processDefinitionCode:{}.",
insertVersion, processDefinition.getProjectCode(), processDefinition.getCode());
processDefinition.setVersion(insertVersion);
}
@ -213,7 +303,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
if (taskCode == 0) {
logger.error("Delete task process relation error due to parameter taskCode is 0, projectCode:{}, processDefinitionCode:{}.",
logger.error(
"Delete task process relation error due to parameter taskCode is 0, projectCode:{}, processDefinitionCode:{}.",
projectCode, processDefinitionCode);
putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
return result;
@ -234,7 +325,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
if (CollectionUtils.isEmpty(processTaskRelationList)) {
logger.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode);
logger.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinitionCode);
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result;
}
@ -249,7 +341,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
if (CollectionUtils.isNotEmpty(downstreamList)) {
String downstream = StringUtils.join(downstreamList, ",");
logger.warn("Relation can not be deleted because task has downstream tasks:[{}], projectCode:{}, processDefinitionCode:{}, taskDefinitionCode:{}.",
logger.warn(
"Relation can not be deleted because task has downstream tasks:[{}], projectCode:{}, processDefinitionCode:{}, taskDefinitionCode:{}.",
downstream, projectCode, processDefinitionCode, taskCode);
putMsg(result, Status.TASK_HAS_DOWNSTREAM, downstream);
return result;
@ -265,12 +358,125 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
} else
logger.info("Delete {} type task definition complete, taskDefinitionCode:{}.", taskDefinition.getTaskType(), taskCode);
logger.info("Delete {} type task definition complete, taskDefinitionCode:{}.",
taskDefinition.getTaskType(), taskCode);
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* delete process task relation, will delete exists relation preTaskCode -> postTaskCode, throw error if not exists
*
* @param loginUser login user
* @param preTaskCode relation upstream code
* @param postTaskCode relation downstream code
*/
@Override
@Transactional
public void deleteTaskProcessRelationV2(User loginUser,
long preTaskCode,
long postTaskCode) {
ProcessTaskRelation processTaskRelation =
new TaskRelationFilterRequest(preTaskCode, postTaskCode).convert2TaskDefinition();
Page<ProcessTaskRelation> page =
new Page<>(new TaskRelationFilterRequest(preTaskCode, postTaskCode).getPageNo(),
new TaskRelationFilterRequest(preTaskCode, postTaskCode).getPageSize());
IPage<ProcessTaskRelation> processTaskRelationIPage =
processTaskRelationMapper.filterProcessTaskRelation(page, processTaskRelation);
List<ProcessTaskRelation> processTaskRelations = processTaskRelationIPage.getRecords();
if (processTaskRelations.size() != 1) {
throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXPECT, 1, processTaskRelations.size());
}
ProcessTaskRelation processTaskRelationDb = processTaskRelations.get(0);
Project project = projectMapper.queryByCode(processTaskRelationDb.getProjectCode());
projectService.checkProjectAndAuthThrowException(loginUser, project, null);
processTaskRelationMapper.deleteById(processTaskRelationDb.getId());
}
/**
* delete process task relation, will delete exists relation upstream -> downstream, throw error if not exists
*
* @param loginUser login user
* @param taskCode relation upstream code
* @param taskRelationUpdateUpstreamRequest relation downstream code
*/
@Override
@Transactional
public List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
long taskCode,
TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
TaskDefinition downstreamTask = taskDefinitionMapper.queryByCode(taskCode);
if (downstreamTask == null) {
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode);
}
List<Long> upstreamTaskCodes = taskRelationUpdateUpstreamRequest.getUpstreams();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setPostTaskCode(taskCode);
Page<ProcessTaskRelation> page = new Page<>(taskRelationUpdateUpstreamRequest.getPageNo(),
taskRelationUpdateUpstreamRequest.getPageSize());
IPage<ProcessTaskRelation> processTaskRelationExistsIPage =
processTaskRelationMapper.filterProcessTaskRelation(page, processTaskRelation);
List<ProcessTaskRelation> processTaskRelationExists = processTaskRelationExistsIPage.getRecords();
ProcessDefinition processDefinition = null;
if (CollectionUtils.isNotEmpty(processTaskRelationExists)) {
processDefinition =
processDefinitionMapper.queryByCode(processTaskRelationExists.get(0).getProcessDefinitionCode());
} else if (taskRelationUpdateUpstreamRequest.getWorkflowCode() != 0L) {
processDefinition =
processDefinitionMapper.queryByCode(taskRelationUpdateUpstreamRequest.getWorkflowCode());
}
if (processDefinition == null) {
throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR,
taskRelationUpdateUpstreamRequest.toString());
}
// get new relation to create and out of date relation to delete
List<Long> taskCodeCreates = upstreamTaskCodes
.stream()
.filter(upstreamTaskCode -> processTaskRelationExists.stream().noneMatch(
processTaskRelationExist -> processTaskRelationExist.getPreTaskCode() == upstreamTaskCode))
.collect(Collectors.toList());
List<Long> taskCodeDeletes = processTaskRelationExists
.stream()
.map(ProcessTaskRelation::getPreTaskCode)
.filter(preTaskCode -> !upstreamTaskCodes.contains(preTaskCode))
.collect(Collectors.toList());
// delete relation not exists
if (CollectionUtils.isNotEmpty(taskCodeDeletes)) {
int delete = processTaskRelationMapper.deleteBatchIds(taskCodeDeletes);
if (delete != taskCodeDeletes.size()) {
throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_DELETE_ERROR, taskCodeDeletes);
}
}
// create relation not exists
List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
for (long createCode : taskCodeCreates) {
TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode);
ProcessTaskRelation processTaskRelationCreate =
new ProcessTaskRelation(null, processDefinition.getVersion(), downstreamTask.getProjectCode(),
processDefinition.getCode(), upstreamTask.getCode(), upstreamTask.getVersion(),
downstreamTask.getCode(), downstreamTask.getVersion(), null, null);
processTaskRelations.add(processTaskRelationCreate);
}
int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelations);
if (batchInsert != processTaskRelations.size()) {
throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_CREATE_ERROR, taskCodeCreates);
}
// batch sync to process task relation log
this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations);
return processTaskRelations;
}
private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition,
List<ProcessTaskRelation> processTaskRelationList) {
List<ProcessTaskRelationLog> relationLogs =
@ -279,12 +485,14 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processDefinition.getCode(),
processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
logger.info("Update task relations complete, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.",
logger.info(
"Update task relations complete, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
logger.error("Update task relations error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.",
logger.error(
"Update task relations error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion());
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
@ -348,7 +556,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
if (processDefinition == null) {
logger.error("Process definition does not exist, processDefinitionCode:{}.", upstreamList.get(0).getProcessDefinitionCode());
logger.error("Process definition does not exist, processDefinitionCode:{}.",
upstreamList.get(0).getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(upstreamList.get(0).getProcessDefinitionCode()));
return result;
@ -418,7 +627,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(downstreamList.get(0).getProcessDefinitionCode());
if (processDefinition == null) {
logger.error("Process definition does not exist, processDefinitionCode:{}.", downstreamList.get(0).getProcessDefinitionCode());
logger.error("Process definition does not exist, processDefinitionCode:{}.",
downstreamList.get(0).getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(downstreamList.get(0).getProcessDefinitionCode()));
return result;
@ -530,7 +740,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode);
logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
@ -538,7 +749,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
if (CollectionUtils.isEmpty(processTaskRelationList)) {
logger.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode);
logger.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinitionCode);
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result;
}
@ -561,12 +773,14 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
if (processTaskRelation.getPreTaskCode() == preTaskCode) {
int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId());
if (delete == 0) {
logger.error("Delete task relation edge error, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}",
logger.error(
"Delete task relation edge error, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}",
processTaskRelation.getId(), preTaskCode, postTaskCode);
putMsg(result, Status.DELETE_EDGE_ERROR);
throw new ServiceException(Status.DELETE_EDGE_ERROR);
} else
logger.info("Delete task relation edge complete, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}",
logger.info(
"Delete task relation edge complete, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}",
processTaskRelation.getId(), preTaskCode, postTaskCode);
processTaskRelationList.remove(processTaskRelation);
}
@ -577,7 +791,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processTaskRelation.setPreTaskVersion(0);
processTaskRelation.setPreTaskCode(0L);
processTaskRelationList.add(processTaskRelation);
logger.info("Delete task relation through set invalid value for it: preTaskCode from {} to 0, processTaskRelationId:{}.",
logger.info(
"Delete task relation through set invalid value for it: preTaskCode from {} to 0, processTaskRelationId:{}.",
preTaskCode, processTaskRelation.getId());
}
updateProcessDefiniteVersion(loginUser, result, processDefinition);

331
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -22,11 +22,19 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_VERSION_VIEW;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.permission.PermissionCheck;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -60,6 +68,7 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -105,6 +114,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationService processTaskRelationService;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@ -114,6 +126,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private ProcessDefinitionService processDefinitionService;
/**
* create task definition
*
@ -166,6 +181,91 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
private TaskDefinitionLog persist2TaskDefinitionLog(User user, TaskDefinition taskDefinition) {
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
taskDefinitionLog.setOperator(user.getId());
taskDefinitionLog.setOperateTime(new Date());
int result = taskDefinitionLogMapper.insert(taskDefinitionLog);
if (result <= 0) {
throw new ServiceException(Status.CREATE_TASK_DEFINITION_LOG_ERROR, taskDefinitionLog.getName());
}
return taskDefinitionLog;
}
private void checkTaskDefinitionValid(User user, TaskDefinition taskDefinition, String permissions) {
// check user access for project
Project project = projectMapper.queryByCode(taskDefinition.getProjectCode());
projectService.checkProjectAndAuthThrowException(user, project, permissions);
if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinition.getTaskType())
.taskParams(taskDefinition.getTaskParams())
.dependence(taskDefinition.getDependence())
.build())) {
throw new ServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName());
}
}
private List<ProcessTaskRelation> updateTaskUpstreams(User user, long workflowCode, long taskCode,
String upstreamCodes) {
TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest = new TaskRelationUpdateUpstreamRequest();
taskRelationUpdateUpstreamRequest.setWorkflowCode(workflowCode);
taskRelationUpdateUpstreamRequest.setUpstreams(upstreamCodes);
return processTaskRelationService.updateUpstreamTaskDefinition(user, taskCode,
taskRelationUpdateUpstreamRequest);
}
private ProcessDefinition updateWorkflowLocation(User user, ProcessDefinition processDefinition) {
WorkflowUpdateRequest workflowUpdateRequest = new WorkflowUpdateRequest();
workflowUpdateRequest.setLocation(null);
return processDefinitionService.updateSingleProcessDefinition(user, processDefinition.getCode(),
workflowUpdateRequest);
}
/**
* Create resource task definition
*
* @param loginUser login user
* @param taskCreateRequest task definition json
* @return new TaskDefinition have created
*/
@Override
@Transactional
public TaskDefinition createTaskDefinitionV2(User loginUser,
TaskCreateRequest taskCreateRequest) {
TaskDefinition taskDefinition = taskCreateRequest.convert2TaskDefinition();
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(taskCreateRequest.getWorkflowCode());
if (processDefinition == null) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, taskCreateRequest.getWorkflowCode());
}
// Add project code from process definition if not exists
if (taskDefinition.getProjectCode() == 0L) {
taskDefinition.setProjectCode(processDefinition.getProjectCode());
}
this.checkTaskDefinitionValid(loginUser, taskDefinition, TASK_DEFINITION_CREATE);
long taskDefinitionCode;
try {
taskDefinitionCode = CodeGenerateUtils.getInstance().genCode();
} catch (CodeGenerateException e) {
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
taskDefinition.setCode(taskDefinitionCode);
int create = taskDefinitionMapper.insert(taskDefinition);
if (create <= 0) {
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
this.persist2TaskDefinitionLog(loginUser, taskDefinition);
// update related objects: task relationship, workflow's location(need to set to null and front-end will auto
// format it)
this.updateTaskUpstreams(loginUser, taskCreateRequest.getWorkflowCode(), taskDefinition.getCode(),
taskCreateRequest.getUpstreamTasksCodes());
this.updateWorkflowLocation(loginUser, processDefinition);
return taskDefinition;
}
/**
* create single task definition that binds the workflow
*
@ -323,82 +423,71 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
/**
* delete task definition
* Only offline and no downstream dependency can be deleted
*
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
* Whether task definition can be deleted or not
*/
@Transactional
@Override
public Map<String, Object> deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
private void taskCanDeleteValid(User user, TaskDefinition taskDefinition) {
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_DELETE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
if (taskCode == 0) {
logger.warn("Parameter taskCode 0 is invalid.");
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
logger.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
return result;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
logger.warn("Task definition can not be deleted due to task state online, taskDefinitionCode:{}.",
taskCode);
putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
return result;
Project project = projectMapper.queryByCode(taskDefinition.getProjectCode());
projectService.checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_DELETE);
// Whether task relation workflow is online
if (processService.isTaskOnline(taskDefinition.getCode()) && taskDefinition.getFlag() == Flag.YES) {
throw new ServiceException(Status.TASK_DEFINE_STATE_ONLINE, taskDefinition.getCode());
}
// Whether task have downstream tasks
List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryDownstreamByTaskCode(taskCode);
processTaskRelationMapper.queryDownstreamByTaskCode(taskDefinition.getCode());
if (!processTaskRelationList.isEmpty()) {
Set<Long> postTaskCodes = processTaskRelationList
.stream()
.map(ProcessTaskRelation::getPostTaskCode)
.collect(Collectors.toSet());
String postTaskCodesStr = StringUtils.join(postTaskCodes, ",");
logger.warn(
"Task definition can not be deleted due to downstream tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}",
taskCode, postTaskCodesStr);
putMsg(result, Status.TASK_HAS_DOWNSTREAM, postTaskCodesStr);
return result;
String postTaskCodesStr = StringUtils.join(postTaskCodes, Constants.COMMA);
throw new ServiceException(Status.TASK_HAS_DOWNSTREAM, postTaskCodesStr);
}
}
/**
* Delete resource task definition by code
*
* Only task release state offline and no downstream tasks can be deleted, will also remove the exists
* task relation [upstreamTaskCode, taskCode]
*
* @param loginUser login user
* @param taskCode task code
*/
@Transactional
@Override
public void deleteTaskDefinitionByCode(User loginUser, long taskCode) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode);
}
this.taskCanDeleteValid(loginUser, taskDefinition);
int delete = taskDefinitionMapper.deleteByCode(taskCode);
if (delete > 0) {
if (delete <= 0) {
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR, taskDefinition.getCode());
}
// Delete task upstream tasks if exists
List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
processTaskRelationMapper.queryUpstreamByCode(taskDefinition.getProjectCode(), taskCode);
if (!taskRelationList.isEmpty()) {
logger.info(
logger.debug(
"Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.",
taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByProcessCode(taskDefinition.getProjectCode(), processDefinitionCode);
List<ProcessTaskRelation> relationList = processTaskRelations.stream()
.filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList());
updateDag(loginUser, result, processDefinitionCode, relationList, Lists.newArrayList());
} else {
logger.info("Task definition delete complete, projectCode:{}, taskDefinitionCode:{}.", projectCode,
taskCode);
putMsg(result, Status.SUCCESS);
}
} else {
logger.error("Task definition delete error, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode);
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
updateDag(loginUser, processDefinitionCode, relationList, Lists.newArrayList());
}
return result;
}
private void updateDag(User loginUser, Map<String, Object> result, long processDefinitionCode,
private void updateDag(User loginUser, long processDefinitionCode,
List<ProcessTaskRelation> processTaskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
@ -424,12 +513,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
logger.info(
"Save new version task relations complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}.",
processDefinition.getProjectCode(), processDefinitionCode, insertVersion);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
logger.error("Update task relations error, projectCode:{}, processDefinitionCode:{}.",
processDefinition.getProjectCode(), processDefinitionCode);
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
}
@ -461,7 +547,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
updateDag(loginUser, result, processDefinitionCode, processTaskRelations,
updateDag(loginUser, processDefinitionCode, processTaskRelations,
Lists.newArrayList(taskDefinitionToUpdate));
}
logger.info("Update task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode);
@ -470,6 +556,129 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
private void TaskDefinitionUpdateValid(TaskDefinition taskDefinitionOriginal, TaskDefinition taskDefinitionUpdate) {
// Task already online
if (processService.isTaskOnline(taskDefinitionOriginal.getCode())
&& taskDefinitionOriginal.getFlag() == Flag.YES) {
// if stream, can update task definition without online check
if (taskDefinitionOriginal.getTaskExecuteType() != TaskExecuteType.STREAM) {
throw new ServiceException(Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
}
}
// not update anything
if (taskDefinitionOriginal.equals(taskDefinitionUpdate)) {
throw new ServiceException(Status.TASK_DEFINITION_NOT_CHANGE, taskDefinitionOriginal.getCode());
}
// check version invalid
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionOriginal.getCode());
if (version == null || version == 0) {
throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionOriginal.getCode());
}
}
/**
* update task definition
*
* @param loginUser login user
* @param taskCode task code
* @param taskUpdateRequest task definition json object
* @return new TaskDefinition have updated
*/
@Transactional
@Override
public TaskDefinition updateTaskDefinitionV2(User loginUser,
long taskCode,
TaskUpdateRequest taskUpdateRequest) {
TaskDefinition taskDefinitionOriginal = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinitionOriginal == null) {
throw new ServiceException(Status.TASK_DEFINITION_NOT_EXISTS, taskCode);
}
TaskDefinition taskDefinitionUpdate;
try {
taskDefinitionUpdate = taskUpdateRequest.mergeIntoTaskDefinition(taskDefinitionOriginal);
} catch (InvocationTargetException | IllegalAccessException | InstantiationException
| NoSuchMethodException e) {
throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, taskUpdateRequest.toString());
}
this.checkTaskDefinitionValid(loginUser, taskDefinitionUpdate, TASK_DEFINITION_UPDATE);
this.TaskDefinitionUpdateValid(taskDefinitionOriginal, taskDefinitionUpdate);
int update = taskDefinitionMapper.updateById(taskDefinitionUpdate);
if (update <= 0) {
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
TaskDefinitionLog taskDefinitionLog = this.persist2TaskDefinitionLog(loginUser, taskDefinitionUpdate);
List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(taskDefinitionUpdate.getProjectCode(), taskCode);
if (!taskRelationList.isEmpty()) {
logger.info(
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.",
taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByProcessCode(taskDefinitionUpdate.getProjectCode(), processDefinitionCode);
updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog));
}
this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(),
taskUpdateRequest.getUpstreamTasksCodes());
return taskDefinitionUpdate;
}
/**
* Get resource task definition by code
*
* @param loginUser login user
* @param taskCode task code
* @return TaskDefinition
*/
@Override
public TaskDefinition getTaskDefinition(User loginUser,
long taskCode) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode);
}
Project project = projectMapper.queryByCode(taskDefinition.getProjectCode());
projectService.checkProjectAndAuthThrowException(loginUser, project, TASK_DEFINITION);
return taskDefinition;
}
/**
* Get resource task definition according to query parameter
*
* @param loginUser login user
* @param taskFilterRequest taskFilterRequest object you want to filter the resource task definitions
* @return TaskDefinitions of page
*/
@Override
public PageInfo<TaskDefinition> filterTaskDefinition(User loginUser,
TaskFilterRequest taskFilterRequest) {
TaskDefinition taskDefinition = taskFilterRequest.convert2TaskDefinition();
if (taskDefinition.getProjectName() != null) {
Project project = projectMapper.queryByName(taskDefinition.getProjectName());
// check user access for project
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
taskDefinition.setProjectCode(project.getCode());
}
Page<TaskDefinition> page =
new Page<>(taskFilterRequest.getPageNo(), taskFilterRequest.getPageSize());
IPage<TaskDefinition> taskDefinitionIPage =
taskDefinitionMapper.filterTaskDefinition(page, taskDefinition);
PageInfo<TaskDefinition> pageInfo =
new PageInfo<>(taskFilterRequest.getPageNo(), taskFilterRequest.getPageSize());
pageInfo.setTotal((int) taskDefinitionIPage.getTotal());
pageInfo.setTotalList(taskDefinitionIPage.getRecords());
return pageInfo;
}
private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj,
Map<String, Object> result) {
Project project = projectMapper.queryByCode(projectCode);
@ -628,7 +837,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (queryUpStreamTaskCodeMap.isEmpty() && !processTaskRelationList.isEmpty()) {
processTaskRelationList.add(processTaskRelationList.get(0));
}
updateDag(loginUser, result, taskRelation.getProcessDefinitionCode(), processTaskRelations,
updateDag(loginUser, taskRelation.getProcessDefinitionCode(), processTaskRelations,
Lists.newArrayList(taskDefinitionToUpdate));
}
logger.info(
@ -686,7 +895,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
updateDag(loginUser, result, processDefinitionCode, processTaskRelations,
updateDag(loginUser, processDefinitionCode, processTaskRelations,
Lists.newArrayList(taskDefinitionUpdate));
} else {
logger.info(

251
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

@ -17,7 +17,11 @@
package org.apache.dolphinscheduler.api.service;
import static org.mockito.ArgumentMatchers.isA;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationCreateRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.ProcessTaskRelationServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
@ -41,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -49,13 +54,17 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
/**
@ -91,17 +100,21 @@ public class ProcessTaskRelationServiceTest {
@Mock
private ProcessService processService;
/**
* get Mock Admin User
*
* @return admin user
*/
private User getAdminUser() {
private static final long PROJECT_CODE = 1L;
private static final long PROCESS_DEFINITION_CODE = 2L;
private static final long UPSTREAM_TASK_CODE = 3L;
private static final long DOWNSTREAM_TASK_CODE = 4L;
protected User user;
protected Exception exception;
@Before
public void before() {
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserName("admin");
loginUser.setId(1);
loginUser.setTenantId(2);
loginUser.setUserType(UserType.GENERAL_USER);
return loginUser;
loginUser.setUserName("admin");
user = loginUser;
}
/**
@ -131,6 +144,7 @@ public class ProcessTaskRelationServiceTest {
private TaskDefinitionLog buildTaskDefinitionLog(long projectCode, long code, int version) {
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog() {
@Override
public boolean equals(Object o) {
if (this == o) {
@ -159,6 +173,7 @@ public class ProcessTaskRelationServiceTest {
private TaskDefinition buildTaskDefinition(long projectCode, long code, int version) {
TaskDefinition taskDefinition = new TaskDefinition() {
@Override
public boolean equals(Object o) {
if (this == o) {
@ -210,7 +225,7 @@ public class ProcessTaskRelationServiceTest {
return processTaskRelationList;
}
private List<ProcessTaskRelation> getProcessTaskDownstreamRelationList(long projectCode,long taskCode) {
private List<ProcessTaskRelation> getProcessTaskDownstreamRelationList(long projectCode, long taskCode) {
ProcessTaskRelation processTaskRelationDownstream0 = new ProcessTaskRelation();
processTaskRelationDownstream0.setPreTaskCode(taskCode);
processTaskRelationDownstream0.setPostTaskCode(456);
@ -264,27 +279,29 @@ public class ProcessTaskRelationServiceTest {
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(Lists.newArrayList());
Mockito.when(
processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode))
.thenReturn(Lists.newArrayList());
Mockito.when(taskDefinitionMapper.queryByCode(postTaskCode)).thenReturn(getTaskDefinition());
List<ProcessTaskRelationLog> processTaskRelationList = Lists.newArrayList();
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLog.setPostTaskCode(postTaskCode);
processTaskRelationLog.setPostTaskVersion(1);
processTaskRelationList.add(processTaskRelationLog);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
List<ProcessTaskRelationLog> processTaskRelationLogList = Lists.newArrayList();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
processTaskRelation.setPreTaskCode(0L);
processTaskRelation.setPreTaskVersion(0);
processTaskRelation.setPostTaskCode(postTaskCode);
processTaskRelation.setPostTaskVersion(1);
processTaskRelationList.add(processTaskRelation);
processTaskRelationLogList.add(new ProcessTaskRelationLog(processTaskRelation));
Mockito.when(processTaskRelationMapper.batchInsert(processTaskRelationList)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.batchInsert(processTaskRelationList)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.batchInsert(processTaskRelationLogList)).thenReturn(1);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@ -296,17 +313,13 @@ public class ProcessTaskRelationServiceTest {
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
List<ProcessTaskRelation> processTaskRelationList = getProcessTaskDownstreamRelationList(projectCode,taskCode);
List<ProcessTaskRelation> processTaskRelationList = getProcessTaskDownstreamRelationList(projectCode, taskCode);
Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode,taskCode))
Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode))
.thenReturn(processTaskRelationList);
if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
@ -327,8 +340,7 @@ public class ProcessTaskRelationServiceTest {
TaskDefinitionLog taskDefinitionLog = buildTaskDefinitionLog(
processTaskRelation.getProjectCode(),
processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion()
);
processTaskRelation.getPostTaskVersion());
return taskDefinitionLog;
})
.collect(Collectors.toSet());
@ -337,7 +349,7 @@ public class ProcessTaskRelationServiceTest {
.thenReturn(taskDefinitionLogList);
}
Map<String, Object> relation = processTaskRelationService
.queryDownstreamRelation(loginUser, projectCode, taskCode);
.queryDownstreamRelation(user, projectCode, taskCode);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
Assert.assertEquals(2, ((List) relation.get("data")).size());
}
@ -350,16 +362,13 @@ public class ProcessTaskRelationServiceTest {
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
List<ProcessTaskRelation> processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode,taskCode);
Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
List<ProcessTaskRelation> processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode, taskCode);
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode))
.thenReturn(processTaskRelationList);
if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
Set<TaskDefinition> taskDefinitions = processTaskRelationList
@ -388,7 +397,7 @@ public class ProcessTaskRelationServiceTest {
.thenReturn(taskDefinitionLogList);
}
Map<String, Object> relation = processTaskRelationService
.queryUpstreamRelation(loginUser, projectCode, taskCode);
.queryUpstreamRelation(user, projectCode, taskCode);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
Assert.assertEquals(2, ((List) relation.get("data")).size());
}
@ -400,12 +409,9 @@ public class ProcessTaskRelationServiceTest {
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode);
@ -413,14 +419,17 @@ public class ProcessTaskRelationServiceTest {
processTaskRelation.setPreTaskCode(taskCode);
processTaskRelation.setPostTaskCode(123L);
processTaskRelationList.add(processTaskRelation);
Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode))
.thenReturn(processTaskRelationList);
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(processDefinition);
Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
Map<String, Object> result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode);
Mockito.when(processService.saveProcessDefine(user, processDefinition, Boolean.TRUE, Boolean.TRUE))
.thenReturn(1);
Map<String, Object> result1 =
processTaskRelationService.deleteDownstreamRelation(user, projectCode, "123", taskCode);
Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
}
@ -431,9 +440,6 @@ public class ProcessTaskRelationServiceTest {
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
@ -445,12 +451,14 @@ public class ProcessTaskRelationServiceTest {
processTaskRelation.setPostTaskCode(taskCode);
processTaskRelation.setPostTaskVersion(1);
processTaskRelationList.add(processTaskRelation);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList);
Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode))
.thenReturn(processTaskRelationList);
Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(getProcessDefinition());
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList);
List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
List<ProcessTaskRelationLog> relationLogs =
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
Mockito.when(processService.saveTaskRelation(user, 1L, 1L,
1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@ -465,13 +473,12 @@ public class ProcessTaskRelationServiceTest {
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(Lists.newArrayList());
Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
Mockito.when(
processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode))
.thenReturn(Lists.newArrayList());
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
TaskDefinition taskDefinition = new TaskDefinition();
@ -486,9 +493,11 @@ public class ProcessTaskRelationServiceTest {
processTaskRelation.setPostTaskCode(taskCode);
processTaskRelation.setPostTaskVersion(1);
processTaskRelationList.add(processTaskRelation);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)).thenReturn(processTaskRelationList);
List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode))
.thenReturn(processTaskRelationList);
List<ProcessTaskRelationLog> relationLogs =
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
Mockito.when(processService.saveTaskRelation(user, 1L, 1L,
1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@ -502,12 +511,9 @@ public class ProcessTaskRelationServiceTest {
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
@ -515,14 +521,111 @@ public class ProcessTaskRelationServiceTest {
processTaskRelation.setPreTaskCode(preTaskCode);
processTaskRelation.setPostTaskCode(postTaskCode);
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperator(user.getId());
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelation);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList);
List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
List<ProcessTaskRelationLog> relationLogs =
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
Mockito.when(processService.saveTaskRelation(user, 1L, 1L,
1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
public void testCreateProcessTaskRelationV2() {
TaskRelationCreateRequest taskRelationCreateRequest = new TaskRelationCreateRequest();
taskRelationCreateRequest.setWorkflowCode(PROCESS_DEFINITION_CODE);
// error process definition not exists
exception = Assertions.assertThrows(ServiceException.class,
() -> processTaskRelationService.createProcessTaskRelationV2(user, taskRelationCreateRequest));
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error project without permissions
Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition());
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject(PROJECT_CODE));
Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
.checkProjectAndAuthThrowException(user, getProject(PROJECT_CODE), null);
exception = Assertions.assertThrows(ServiceException.class,
() -> processTaskRelationService.createProcessTaskRelationV2(user, taskRelationCreateRequest));
Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
((ServiceException) exception).getCode());
// error insert process task relation
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(PROJECT_CODE),
null);
Mockito.when(processTaskRelationMapper.insert(isA(ProcessTaskRelation.class))).thenReturn(0);
Mockito.when(taskDefinitionMapper.queryByCode(isA(Long.class))).thenReturn(getTaskDefinition());
exception = Assertions.assertThrows(ServiceException.class,
() -> processTaskRelationService.createProcessTaskRelationV2(user, taskRelationCreateRequest));
Assertions.assertEquals(Status.CREATE_PROCESS_TASK_RELATION_ERROR.getCode(),
((ServiceException) exception).getCode());
// error insert process task relation log
Mockito.when(processTaskRelationMapper.insert(isA(ProcessTaskRelation.class))).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.insert(isA(ProcessTaskRelationLog.class))).thenReturn(0);
exception = Assertions.assertThrows(ServiceException.class,
() -> processTaskRelationService.createProcessTaskRelationV2(user, taskRelationCreateRequest));
Assertions.assertEquals(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR.getCode(),
((ServiceException) exception).getCode());
// success
Mockito.when(processTaskRelationLogMapper.insert(isA(ProcessTaskRelationLog.class))).thenReturn(1);
Assertions.assertDoesNotThrow(
() -> processTaskRelationService.createProcessTaskRelationV2(user, taskRelationCreateRequest));
}
@Test
public void testDeleteTaskProcessRelationV2() {
TaskRelationCreateRequest taskRelationCreateRequest = new TaskRelationCreateRequest();
taskRelationCreateRequest.setWorkflowCode(PROCESS_DEFINITION_CODE);
// error task relation size
Mockito.when(
processTaskRelationMapper.filterProcessTaskRelation(isA(Page.class), isA(ProcessTaskRelation.class)))
.thenReturn(getMultiProcessTaskRelations());
exception = Assertions.assertThrows(ServiceException.class, () -> processTaskRelationService
.deleteTaskProcessRelationV2(user, UPSTREAM_TASK_CODE, DOWNSTREAM_TASK_CODE));
Assertions.assertEquals(Status.PROCESS_TASK_RELATION_NOT_EXPECT.getCode(),
((ServiceException) exception).getCode());
// success
Mockito.when(
processTaskRelationMapper.filterProcessTaskRelation(isA(Page.class), isA(ProcessTaskRelation.class)))
.thenReturn(getOneProcessTaskRelation());
Assertions.assertDoesNotThrow(() -> processTaskRelationService.deleteTaskProcessRelationV2(user,
UPSTREAM_TASK_CODE, DOWNSTREAM_TASK_CODE));
}
private IPage<ProcessTaskRelation> getOneProcessTaskRelation() {
IPage<ProcessTaskRelation> processTaskRelationIPage = new Page<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
processTaskRelation.setPreTaskCode(UPSTREAM_TASK_CODE);
processTaskRelation.setPostTaskCode(DOWNSTREAM_TASK_CODE);
processTaskRelationIPage.setRecords(Collections.singletonList(processTaskRelation));
return processTaskRelationIPage;
}
private IPage<ProcessTaskRelation> getMultiProcessTaskRelations() {
IPage<ProcessTaskRelation> processTaskRelationIPage = new Page<>();
List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
ProcessTaskRelation processTaskRelation0 = new ProcessTaskRelation();
processTaskRelation0.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
processTaskRelation0.setPreTaskCode(UPSTREAM_TASK_CODE);
processTaskRelation0.setPostTaskCode(DOWNSTREAM_TASK_CODE);
processTaskRelations.add(processTaskRelation0);
ProcessTaskRelation processTaskRelation1 = new ProcessTaskRelation();
processTaskRelation1.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
processTaskRelation1.setPreTaskCode(UPSTREAM_TASK_CODE);
processTaskRelation1.setPostTaskCode(DOWNSTREAM_TASK_CODE);
processTaskRelations.add(processTaskRelation1);
processTaskRelationIPage.setRecords(processTaskRelations);
return processTaskRelationIPage;
}
}

456
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -17,17 +17,34 @@
package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
import static org.mockito.ArgumentMatchers.isA;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.TaskDefinitionServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@ -42,15 +59,15 @@ import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*;
@RunWith(MockitoJUnitRunner.class)
public class TaskDefinitionServiceImplTest {
@ -78,22 +95,47 @@ public class TaskDefinitionServiceImplTest {
@Mock
private TaskPluginManager taskPluginManager;
@Test
public void createTaskDefinition() {
long projectCode = 1L;
@Mock
private ProcessTaskRelationService processTaskRelationService;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private ProcessDefinitionService processDefinitionService;
private static final String TASK_PARAMETER =
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";;
private static final long PROJECT_CODE = 1L;
private static final long PROCESS_DEFINITION_CODE = 2L;
private static final long TASK_CODE = 3L;
private static final int VERSION = 1;
private static final int RESOURCE_RATE = -1;
protected User user;
protected Exception exception;
@Before
public void before() {
User loginUser = new User();
loginUser.setId(-1);
loginUser.setId(1);
loginUser.setTenantId(2);
loginUser.setUserType(UserType.GENERAL_USER);
loginUser.setUserName("admin");
user = loginUser;
}
@Test
public void createTaskDefinition() {
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_CREATE)).thenReturn(result);
putMsg(result, Status.SUCCESS, PROJECT_CODE);
Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION_CREATE))
.thenReturn(result);
String createTaskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
String createTaskDefinitionJson =
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
@ -101,125 +143,108 @@ public class TaskDefinitionServiceImplTest {
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
List<TaskDefinitionLog> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class);
Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions, Boolean.TRUE)).thenReturn(1);
Mockito.when(processService.saveTaskDefine(user, PROJECT_CODE, taskDefinitions, Boolean.TRUE))
.thenReturn(1);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson);
.createTaskDefinition(user, PROJECT_CODE, createTaskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@Test
public void updateTaskDefinition() {
String taskDefinitionJson = "{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
String taskDefinitionJson =
"{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}";
long projectCode = 1L;
long taskCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_UPDATE)).thenReturn(result);
putMsg(result, Status.SUCCESS, PROJECT_CODE);
Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION_UPDATE))
.thenReturn(result);
Mockito.when(processService.isTaskOnline(taskCode)).thenReturn(Boolean.FALSE);
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(new TaskDefinition());
Mockito.when(processService.isTaskOnline(TASK_CODE)).thenReturn(Boolean.FALSE);
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition());
Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode)).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskCode, taskDefinitionJson);
result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
public void queryTaskDefinitionByName() {
String taskName = "task";
long projectCode = 1L;
long processCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION )).thenReturn(result);
putMsg(result, Status.SUCCESS, PROJECT_CODE);
Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION))
.thenReturn(result);
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName))
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), PROCESS_DEFINITION_CODE, taskName))
.thenReturn(new TaskDefinition());
Map<String, Object> relation = taskDefinitionService
.queryTaskDefinitionByName(loginUser, projectCode, processCode, taskName);
.queryTaskDefinitionByName(user, PROJECT_CODE, PROCESS_DEFINITION_CODE, taskName);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@Test
public void deleteTaskDefinitionByCode() {
long projectCode = 1L;
long taskCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_DELETE )).thenReturn(result);
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
Mockito.when(processTaskRelationMapper.queryDownstreamByTaskCode(taskCode))
.thenReturn(new ArrayList<>());
Mockito.when(taskDefinitionMapper.deleteByCode(taskCode))
.thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.deleteTaskDefinitionByCode(loginUser, projectCode, taskCode);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
// error task definition not find
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE));
Assertions.assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error delete single task definition object
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition());
Mockito.when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(0);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE));
Assertions.assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR.getCode(),
((ServiceException) exception).getCode());
// success
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project,
TASK_DEFINITION_DELETE);
Mockito.when(processTaskRelationMapper.queryDownstreamByTaskCode(TASK_CODE)).thenReturn(new ArrayList<>());
Mockito.when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(1);
Assertions.assertDoesNotThrow(() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE));
}
@Test
public void switchVersion() {
int version = 1;
long taskCode = 11L;
long projectCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_SWITCH_TO_THIS_VERSION)).thenReturn(result);
putMsg(result, Status.SUCCESS, PROJECT_CODE);
Mockito.when(
projectService.checkProjectAndAuth(user, project, PROJECT_CODE, WORKFLOW_SWITCH_TO_THIS_VERSION))
.thenReturn(result);
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version))
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, VERSION))
.thenReturn(new TaskDefinitionLog());
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
Mockito.when(taskDefinitionMapper.queryByCode(taskCode))
taskDefinition.setProjectCode(PROJECT_CODE);
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE))
.thenReturn(taskDefinition);
Mockito.when(taskDefinitionMapper.updateById(new TaskDefinitionLog())).thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.switchVersion(loginUser, projectCode, taskCode, version);
.switchVersion(user, PROJECT_CODE, TASK_CODE, VERSION);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@ -233,33 +258,10 @@ public class TaskDefinitionServiceImplTest {
}
}
/**
* get mock Project
*
* @param projectCode projectCode
* @return Project
*/
private Project getProject(long projectCode) {
Project project = new Project();
project.setId(1);
project.setCode(projectCode);
project.setName("test");
project.setUserId(1);
return project;
}
private TaskDefinition getTaskDefinition() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(1L);
taskDefinition.setCode(1L);
taskDefinition.setVersion(1);
taskDefinition.setTaskType("SHELL");
return taskDefinition;
}
@Test
public void checkJson() {
String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
String taskDefinitionJson =
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
@ -268,7 +270,8 @@ public class TaskDefinitionServiceImplTest {
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
Assert.assertFalse(taskDefinitionLogs.isEmpty());
String taskJson = "[{\"name\":\"shell1\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
String taskJson =
"[{\"name\":\"shell1\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
+ "\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},"
+ "\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\","
+ "\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"},{\"name\":\"shell2\",\"description\":\"\","
@ -277,12 +280,14 @@ public class TaskDefinitionServiceImplTest {
+ "\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"}]";
taskDefinitionLogs = JSONUtils.toList(taskJson, TaskDefinitionLog.class);
Assert.assertFalse(taskDefinitionLogs.isEmpty());
String taskParams = "{\"resourceList\":[],\"localParams\":[{\"prop\":\"datetime\",\"direct\":\"IN\",\"type\":\"VARCHAR\","
String taskParams =
"{\"resourceList\":[],\"localParams\":[{\"prop\":\"datetime\",\"direct\":\"IN\",\"type\":\"VARCHAR\","
+ "\"value\":\"${system.datetime}\"}],\"rawScript\":\"echo ${datetime}\",\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+ "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":{}}";
Map parameters = JSONUtils.parseObject(taskParams, Map.class);
Assert.assertNotNull(parameters);
String params = "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";
String params =
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";
Map parameters1 = JSONUtils.parseObject(params, Map.class);
Assert.assertNotNull(parameters1);
}
@ -295,43 +300,238 @@ public class TaskDefinitionServiceImplTest {
@Test
public void testReleaseTaskDefinition() {
long projectCode = 1L;
long taskCode = 1L;
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Project project = getProject(projectCode);
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
Project project = getProject();
// check task dose not exist
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
Map<String, Object> map = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.OFFLINE);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, TASK_CODE);
Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, null)).thenReturn(result);
Map<String, Object> map =
taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE);
Assert.assertEquals(Status.TASK_DEFINE_NOT_EXIST, map.get(Constants.STATUS));
// process definition offline
putMsg(result, Status.SUCCESS);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setProjectCode(PROJECT_CODE);
taskDefinition.setVersion(1);
taskDefinition.setCode(taskCode);
String params = "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";
taskDefinition.setCode(TASK_CODE);
String params =
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";
taskDefinition.setTaskParams(params);
taskDefinition.setTaskType("SHELL");
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition);
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinition.getVersion())).thenReturn(taskDefinitionLog);
Map<String, Object> offlineTaskResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.OFFLINE);
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, taskDefinition.getVersion()))
.thenReturn(taskDefinitionLog);
Map<String, Object> offlineTaskResult =
taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS, offlineTaskResult.get(Constants.STATUS));
// process definition online, resource exist
Map<String, Object> onlineTaskResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.ONLINE);
Map<String, Object> onlineTaskResult =
taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineTaskResult.get(Constants.STATUS));
// release error code
Map<String, Object> failResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.getEnum(2));
Map<String, Object> failResult =
taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.getEnum(2));
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS));
}
@Test
public void testCreateTaskDefinitionV2() {
TaskCreateRequest taskCreateRequest = new TaskCreateRequest();
taskCreateRequest.setProjectCode(PROJECT_CODE);
taskCreateRequest.setWorkflowCode(PROCESS_DEFINITION_CODE);
// error process definition not find
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error project not find
Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition());
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
.checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_CREATE);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error task definition
taskCreateRequest.setTaskParams(TASK_PARAMETER);
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(),
TASK_DEFINITION_CREATE);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(),
((ServiceException) exception).getCode());
// error create task definition object
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
Mockito.when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(0);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_ERROR.getCode(),
((ServiceException) exception).getCode());
// error sync to task definition log
Mockito.when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_LOG_ERROR.getCode(),
((ServiceException) exception).getCode());
// success
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class),
isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList());
Mockito.when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class),
isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition());
Assertions.assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
}
@Test
public void testUpdateTaskDefinitionV2() {
TaskUpdateRequest taskUpdateRequest = new TaskUpdateRequest();
// error task definition not exists
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.TASK_DEFINITION_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode());
// error project not find
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition());
Mockito.when(projectMapper.queryByCode(isA(Long.class))).thenReturn(getProject());
Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
.checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_UPDATE);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error task definition
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(),
TASK_DEFINITION_UPDATE);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(),
((ServiceException) exception).getCode());
// error task definition already online
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(),
((ServiceException) exception).getCode());
// error task definition nothing update
Mockito.when(processService.isTaskOnline(TASK_CODE)).thenReturn(false);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.TASK_DEFINITION_NOT_CHANGE.getCode(), ((ServiceException) exception).getCode());
// error task definition version invalid
taskUpdateRequest.setTaskPriority(String.valueOf(Priority.HIGH));
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.DATA_IS_NOT_VALID.getCode(), ((ServiceException) exception).getCode());
// error task definition update effect number
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(VERSION);
Mockito.when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(0);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.UPDATE_TASK_DEFINITION_ERROR.getCode(),
((ServiceException) exception).getCode());
// error task definition log insert
Mockito.when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_LOG_ERROR.getCode(),
((ServiceException) exception).getCode());
// success
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class),
isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList());
Assertions.assertDoesNotThrow(
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
}
@Test
public void testGetTaskDefinition() {
// error task definition not exists
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE));
Assertions.assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error task definition not exists
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition());
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
.checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE));
Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
((ServiceException) exception).getCode());
// success
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION);
Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE));
}
/**
* get mock Project
*
* @return Project
*/
private Project getProject() {
Project project = new Project();
project.setId(1);
project.setCode(PROJECT_CODE);
project.setName("test");
project.setUserId(1);
return project;
}
private ProcessDefinition getProcessDefinition() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setProjectCode(PROJECT_CODE);
processDefinition.setCode(PROCESS_DEFINITION_CODE);
processDefinition.setVersion(VERSION);
return processDefinition;
}
private TaskDefinition getTaskDefinition() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(PROJECT_CODE);
taskDefinition.setCode(TASK_CODE);
taskDefinition.setVersion(VERSION);
taskDefinition.setTaskType("SHELL");
taskDefinition.setTaskParams(TASK_PARAMETER);
taskDefinition.setFlag(Flag.YES);
taskDefinition.setCpuQuota(RESOURCE_RATE);
taskDefinition.setMemoryMax(RESOURCE_RATE);
return taskDefinition;
}
private List<ProcessTaskRelation> getProcessTaskRelationList() {
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(PROJECT_CODE);
processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
processTaskRelation.setPreTaskCode(TASK_CODE);
processTaskRelation.setPostTaskCode(TASK_CODE + 1L);
processTaskRelationList.add(processTaskRelation);
return processTaskRelationList;
}
}

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java

@ -42,7 +42,6 @@ public class ProcessDefinitionLog extends ProcessDefinition {
}
public ProcessDefinitionLog(ProcessDefinition processDefinition) {
this.setId(processDefinition.getId());
this.setCode(processDefinition.getCode());
this.setName(processDefinition.getName());
this.setVersion(processDefinition.getVersion());

26
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java

@ -113,9 +113,7 @@ public class ProcessTaskRelation {
long postTaskCode,
int postTaskVersion,
ConditionType conditionType,
String conditionParams,
Date createTime,
Date updateTime) {
String conditionParams) {
this.name = name;
this.processDefinitionVersion = processDefinitionVersion;
this.projectCode = projectCode;
@ -126,8 +124,26 @@ public class ProcessTaskRelation {
this.postTaskVersion = postTaskVersion;
this.conditionType = conditionType;
this.conditionParams = conditionParams;
this.createTime = createTime;
this.updateTime = updateTime;
Date now = new Date();
this.createTime = now;
this.updateTime = now;
}
public ProcessTaskRelation(ProcessTaskRelationLog processTaskRelationLog) {
this.name = processTaskRelationLog.getName();
this.processDefinitionVersion = processTaskRelationLog.getProcessDefinitionVersion();
this.projectCode = processTaskRelationLog.getProjectCode();
this.processDefinitionCode = processTaskRelationLog.getProcessDefinitionCode();
this.preTaskCode = processTaskRelationLog.getPreTaskCode();
this.preTaskVersion = processTaskRelationLog.getPreTaskVersion();
this.postTaskCode = processTaskRelationLog.getPostTaskCode();
this.postTaskVersion = processTaskRelationLog.getPostTaskVersion();
this.conditionType = processTaskRelationLog.getConditionType();
this.conditionParams = processTaskRelationLog.getConditionParams();
this.createTime = processTaskRelationLog.getCreateTime();
this.updateTime = new Date();
}
}

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java

@ -43,7 +43,6 @@ public class ProcessTaskRelationLog extends ProcessTaskRelation {
public ProcessTaskRelationLog(ProcessTaskRelation processTaskRelation) {
super();
this.setId(processTaskRelation.getId());
this.setName(processTaskRelation.getName());
this.setProcessDefinitionCode(processTaskRelation.getProcessDefinitionCode());
this.setProcessDefinitionVersion(processTaskRelation.getProcessDefinitionVersion());

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java

@ -43,7 +43,6 @@ public class TaskDefinitionLog extends TaskDefinition {
public TaskDefinitionLog(TaskDefinition taskDefinition) {
super();
this.setId(taskDefinition.getId());
this.setCode(taskDefinition.getCode());
this.setVersion(taskDefinition.getVersion());
this.setName(taskDefinition.getName());

23
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -30,6 +30,7 @@ import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
/**
* process task relation mapper interface
@ -86,7 +87,7 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param taskRelationList taskRelationList
* @return int
*/
int batchInsert(@Param("taskRelationList") List<ProcessTaskRelationLog> taskRelationList);
int batchInsert(@Param("taskRelationList") List<ProcessTaskRelation> taskRelationList);
/**
* query downstream process task relation by taskCode
@ -103,7 +104,8 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param taskCode taskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryUpstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode);
List<ProcessTaskRelation> queryUpstreamByCode(@Param("projectCode") long projectCode,
@Param("taskCode") long taskCode);
/**
* query downstream process task relation by taskCode
@ -112,7 +114,8 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param taskCode taskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode);
List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long projectCode,
@Param("taskCode") long taskCode);
/**
* query task relation by codes
@ -122,7 +125,9 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param preTaskCodes preTaskCode list
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode, @Param("preTaskCodes") Long[] preTaskCodes);
List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long projectCode,
@Param("taskCode") long taskCode,
@Param("preTaskCodes") Long[] preTaskCodes);
/**
* query process task relation by process definition code
@ -197,5 +202,13 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
*/
List<ProcessTaskRelation> queryDownstreamByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/**
* Filter process task relation
*
* @param page page
* @param processTaskRelation process definition object
* @return process task relation IPage
*/
IPage<ProcessTaskRelation> filterProcessTaskRelation(IPage<ProcessTaskRelation> page,
@Param("relation") ProcessTaskRelation processTaskRelation);
}

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -122,7 +122,7 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
@Param("searchWorkflowName") String searchWorkflowName,
@Param("searchTaskName") String searchTaskName,
@Param("taskType") String taskType,
@Param("taskExecuteType")TaskExecuteType taskExecuteType);
@Param("taskExecuteType") TaskExecuteType taskExecuteType);
/**
* query task definition by code list
@ -131,4 +131,14 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
* @return task definition list
*/
List<TaskDefinition> queryByCodeList(@Param("codes") Collection<Long> codes);
/**
* Filter task definition
*
* @param page page
* @param taskDefinition process definition object
* @return task definition IPage
*/
IPage<TaskDefinition> filterTaskDefinition(IPage<TaskDefinition> page,
@Param("task") TaskDefinition taskDefinition);
}

23
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -200,4 +200,27 @@
and post_task_code = #{postTaskCode}
</if>
</select>
<select id="filterProcessTaskRelation"
parameterType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
SELECT
<include refid="baseSql"/>
FROM t_ds_process_task_relation
<where>
<if test=" relation.projectCode != 0">
and project_code = #{relation.projectCode}
</if>
<if test=" relation.processDefinitionCode != 0">
and process_definition_code = #{relation.processDefinitionCode}
</if>
<if test=" relation.preTaskCode != 0">
and pre_task_code = #{relation.preTaskCode}
</if>
<if test=" relation.postTaskCode != 0">
and post_task_code = #{relation.postTaskCode}
</if>
</where>
order by update_time desc, id asc
</select>
</mapper>

20
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -135,4 +135,24 @@
</foreach>
</if>
</select>
<select id="filterTaskDefinition"
parameterType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"
resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"
>
SELECT
<include refid="baseSql"/>
FROM t_ds_task_definition
<where>
<if test=" task.projectCode != 0">
and project_code = #{task.projectCode}
</if>
<if test=" task.name != null and task.name != ''">
and name like concat('%', #{task.name}, '%')
</if>
<if test=" task.taskType != null and task.taskType != ''">
and task_type = #{task.taskType}
</if>
</where>
order by update_time desc, id asc
</select>
</mapper>

17
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -2448,16 +2448,17 @@ public class ProcessServiceImpl implements ProcessService {
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode());
}
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper
.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
List<ProcessTaskRelation> processTaskRelationListFromLog = processTaskRelationLogMapper
.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()).stream()
.map(ProcessTaskRelation::new).collect(Collectors.toList());
int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelationListFromLog);
if (batchInsert == 0) {
return Constants.EXIT_CODE_FAILURE;
} else {
int result = 0;
for (ProcessTaskRelationLog taskRelationLog : processTaskRelationLogList) {
int switchResult = switchTaskDefinitionVersion(taskRelationLog.getPostTaskCode(),
taskRelationLog.getPostTaskVersion());
for (ProcessTaskRelation taskRelation : processTaskRelationListFromLog) {
int switchResult = switchTaskDefinitionVersion(taskRelation.getPostTaskCode(),
taskRelation.getPostTaskVersion());
if (switchResult != Constants.EXIT_CODE_FAILURE) {
result++;
}
@ -2673,7 +2674,9 @@ public class ProcessServiceImpl implements ProcessService {
}
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
}
insert = processTaskRelationMapper.batchInsert(taskRelationList);
List<ProcessTaskRelation> processTaskRelations =
taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList());
insert = processTaskRelationMapper.batchInsert(processTaskRelations);
}
int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;

Loading…
Cancel
Save