Browse Source

to #7001: delete certain edge in processDefinition (#7038)

* to #7001: delete certain edge in processDefinition

* to #7001: delete certain edge in processDefinition

* to #7001: delete certain edge in processDefinition

* to #7001: delete certain edge in processDefinition

* to #7001: fix checkstyle

* to #7001: delete edge by projectCode,processDefinitionCode,preTaskCode and postTaskCode

* to #7001: add UT

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
3.0.0/version-upgrade
zwZjut 3 years ago committed by GitHub
parent
commit
b815da6158
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
  2. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  3. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
  4. 51
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  5. 30
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
  6. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  7. 18
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

32
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_PROCESS_TASK_RELATION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.CREATE_PROCESS_TASK_RELATION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID; import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_EDGE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_PROCESS_RELATION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_PROCESS_RELATION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.MOVE_PROCESS_TASK_RELATION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.MOVE_PROCESS_TASK_RELATION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_PROCESS_RELATION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_PROCESS_RELATION_ERROR;
@ -253,4 +254,35 @@ public class ProcessTaskRelationController extends BaseController {
@PathVariable("taskCode") long taskCode) { @PathVariable("taskCode") long taskCode) {
return returnDataList(processTaskRelationService.queryDownstreamRelation(loginUser, projectCode, taskCode)); return returnDataList(processTaskRelationService.queryDownstreamRelation(loginUser, projectCode, taskCode));
} }
/**
* delete edge
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param preTaskCode pre task code
* @param postTaskCode post task code
* @return delete result code
*/
@ApiOperation(value = "deleteEdge", notes = "DELETE_EDGE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "preTaskCode", value = "PRE_TASK_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "postTaskCode", value = "POST_TASK_CODE", required = true, type = "Long")
})
@DeleteMapping(value = "/{processDefinitionCode}/{preTaskCode}/{postTaskCode}")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_EDGE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteEdge(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true)
@PathVariable long projectCode,
@PathVariable long processDefinitionCode,
@PathVariable long preTaskCode,
@PathVariable long postTaskCode) {
return returnDataList(processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode));
}
} }

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -288,6 +288,7 @@ public enum Status {
TASK_HAS_DOWNSTREAM(50051, "Task [{0}] exists downstream dependence", "任务[{0}]存在下游依赖"), TASK_HAS_DOWNSTREAM(50051, "Task [{0}] exists downstream dependence", "任务[{0}]存在下游依赖"),
MAIN_TABLE_USING_VERSION(50052, "the version that the master table is using", "主表正在使用该版本"), MAIN_TABLE_USING_VERSION(50052, "the version that the master table is using", "主表正在使用该版本"),
PROJECT_PROCESS_NOT_MATCH(50053, "the project and the process is not match", "项目和工作流不匹配"), PROJECT_PROCESS_NOT_MATCH(50053, "the project and the process is not match", "项目和工作流不匹配"),
DELETE_EDGE_ERROR(50054, "delete edge error", "删除工作流任务连接线错误"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/** /**

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java

@ -123,4 +123,16 @@ public interface ProcessTaskRelationService {
Map<String, Object> queryDownstreamRelation(User loginUser, Map<String, Object> queryDownstreamRelation(User loginUser,
long projectCode, long projectCode,
long taskCode); long taskCode);
/**
* delete edge
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param preTaskCode pre task code
* @param postTaskCode post task code
* @return delete result code
*/
Map<String, Object> deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode);
} }

51
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -388,6 +388,57 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result; return result;
} }
/**
* delete edge
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param preTaskCode pre task code
* @param postTaskCode post task code
* @return delete result code
*/
@Override
public Map<String, Object> deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode);
if (CollectionUtils.isEmpty(processTaskRelationList)) {
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result;
}
if (processTaskRelationList.size() > 1) {
putMsg(result, Status.DATA_IS_NOT_VALID, "processTaskRelationList");
return result;
}
ProcessTaskRelation processTaskRelation = processTaskRelationList.get(0);
int upstreamCount = processTaskRelationMapper.countByCode(projectCode, processTaskRelation.getProcessDefinitionCode(),
0L, processTaskRelation.getPostTaskCode());
if (upstreamCount == 0) {
putMsg(result, Status.DATA_IS_NULL, "upstreamCount");
return result;
}
if (upstreamCount > 1) {
int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId());
if (delete == 0) {
putMsg(result, Status.DELETE_EDGE_ERROR);
}
return result;
}
processTaskRelation.setPreTaskVersion(0);
processTaskRelation.setPreTaskCode(0L);
int update = processTaskRelationMapper.updateById(processTaskRelation);
if (update == 0) {
putMsg(result, Status.DELETE_EDGE_ERROR);
}
return result;
}
/** /**
* build task definition * build task definition
* *

30
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

@ -492,4 +492,34 @@ public class ProcessTaskRelationServiceTest {
result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode); result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
} }
@Test
public void testDeleteEdge() {
long projectCode = 1L;
long processDefinitionCode = 3L;
long preTaskCode = 4L;
long postTaskCode = 5L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
processTaskRelation.setPreTaskCode(preTaskCode);
processTaskRelation.setPostTaskCode(postTaskCode);
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelation);
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.countByCode(projectCode, processDefinitionCode, 0L, postTaskCode)).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteById(processTaskRelation.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
result = processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
} }

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -162,4 +162,18 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @return int * @return int
*/ */
int deleteRelation(@Param("processTaskRelationLog") ProcessTaskRelationLog processTaskRelationLog); int deleteRelation(@Param("processTaskRelationLog") ProcessTaskRelationLog processTaskRelationLog);
/**
* count by code
*
* @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
* @param preTaskCode preTaskCode
* @param postTaskCode postTaskCode
* @return ProcessTaskRelation
*/
int countByCode(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode,
@Param("preTaskCode") long preTaskCode,
@Param("postTaskCode") long postTaskCode);
} }

18
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -177,4 +177,22 @@
and post_task_code = #{processTaskRelationLog.postTaskCode} and post_task_code = #{processTaskRelationLog.postTaskCode}
and post_task_version = #{processTaskRelationLog.post_task_version} and post_task_version = #{processTaskRelationLog.post_task_version}
</delete> </delete>
<select id="countByCode" resultType="int">
select count(0)
from t_ds_process_task_relation
WHERE 1=1
<if test="projectCode != 0">
and project_code = #{projectCode}
</if>
<if test="processDefinitionCode != 0">
and process_definition_code = #{processDefinitionCode}
</if>
<if test="preTaskCode != 0">
and pre_task_code = #{preTaskCode}
</if>
<if test="postTaskCode != 0">
and post_task_code = #{postTaskCode}
</if>
</select>
</mapper> </mapper>

Loading…
Cancel
Save