Browse Source

to #6957 implement interface deleteUpstreamRelation deleteDownstreamRelation (#6966)

* to #6957

* to #6957

* to #6957

* to #6957: group by process definition code

* to #6957: fix sonar

* to #6957: fix sonar

* to #6957: add ut , add preTaskVersion

* to #6957: fix style

* to #6957: use org.apache.dolphinscheduler.spi.utils.StringUtils

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
3.0.0/version-upgrade
zwZjut 3 years ago committed by GitHub
parent
commit
1edd2bd0f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 107
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  2. 79
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
  3. 40
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  4. 56
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

107
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
@ -50,6 +51,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.google.common.collect.Lists;
/**
* process task relation service impl
*/
@ -210,7 +213,22 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
*/
@Override
public Map<String, Object> deleteUpstreamRelation(User loginUser, long projectCode, String preTaskCodes, long taskCode) {
return null;
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
if (StringUtils.isEmpty(preTaskCodes)) {
putMsg(result,Status.DATA_IS_NULL,"preTaskCodes");
return result;
}
Set<Long> preTaskCodesSet = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
Status status = deleteUpstreamRelation(projectCode, preTaskCodesSet.toArray(new Long[0]), taskCode);
if (status != Status.SUCCESS) {
putMsg(result, status);
}
return result;
}
/**
@ -224,7 +242,35 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
*/
@Override
public Map<String, Object> deleteDownstreamRelation(User loginUser, long projectCode, String postTaskCodes, long taskCode) {
return null;
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
if (StringUtils.isEmpty(postTaskCodes)) {
putMsg(result,Status.DATA_IS_NULL,"postTaskCodes");
return result;
}
Set<Long> postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
List<Long> deleteFailedCodeList = new ArrayList<>();
postTaskCodesSet.stream().forEach(
postTaskCode -> {
try {
Status status = deleteUpstreamRelation(projectCode, new Long[]{taskCode}, postTaskCode);
if (Status.SUCCESS != status) {
deleteFailedCodeList.add(postTaskCode);
}
} catch (Exception e) {
deleteFailedCodeList.add(postTaskCode);
}
}
);
if (!deleteFailedCodeList.isEmpty()) {
putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR, String.join(",", deleteFailedCodeList.stream().map(o -> o + "").collect(Collectors.toList())));
}
return result;
}
/**
@ -328,4 +374,61 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
};
}
/**
* delete upstream relation
*
* @param projectCode project code
* @param preTaskCodes pre task codes
* @param taskCode pre task code
* @return status
*/
private Status deleteUpstreamRelation(long projectCode, Long[] preTaskCodes, long taskCode) {
List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes);
if (CollectionUtils.isEmpty(upstreamList)) {
return Status.SUCCESS;
}
Map<Long, List<ProcessTaskRelation>> processTaskRelationListGroupByProcessDefinitionCode = upstreamList.stream()
.collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
// count upstream relation group by process definition code
List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = processTaskRelationMapper
.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new Long[0]), taskCode);
List<ProcessTaskRelation> deletes = new ArrayList<>();
List<ProcessTaskRelation> updates = new ArrayList<>();
countListGroupByProcessDefinitionCode.stream().forEach(
processDefinitionCodeUpstreamCountMap ->
processDefinitionCodeUpstreamCountMap.entrySet().stream().forEach(
o -> {
Long processDefinitionCode = o.getKey();
Integer count = o.getValue();
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
if (count <= processTaskRelationList.size()) {
ProcessTaskRelation processTaskRelation = processTaskRelationList.remove(0);
if (processTaskRelation.getPreTaskCode() != 0) {
processTaskRelation.setPreTaskCode(0);
processTaskRelation.setPreTaskVersion(0);
updates.add(processTaskRelation);
}
}
if (!processTaskRelationList.isEmpty()) {
deletes.addAll(processTaskRelationList);
}
}
)
);
int update = 0;
if (!updates.isEmpty()) {
update = processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(updates);
}
int delete = 0;
if (!deletes.isEmpty()) {
delete = processTaskRelationMapper.deleteBatchIds(deletes.stream().map(ProcessTaskRelation::getId).collect(Collectors.toList()));
}
if (update < 0 || delete < 0) {
return Status.DELETE_TASK_PROCESS_RELATION_ERROR;
}
return Status.SUCCESS;
}
}

79
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

@ -186,15 +186,18 @@ public class ProcessTaskRelationServiceTest {
processTaskRelationUpstream0.setPreTaskVersion(1);
processTaskRelationUpstream0.setProjectCode(projectCode);
processTaskRelationUpstream0.setPreTaskCode(123);
processTaskRelationUpstream0.setProcessDefinitionCode(123);
ProcessTaskRelation processTaskRelationUpstream1 = new ProcessTaskRelation();
processTaskRelationUpstream1.setPostTaskCode(taskCode);
processTaskRelationUpstream1.setPreTaskVersion(1);
processTaskRelationUpstream1.setPreTaskCode(123);
processTaskRelationUpstream0.setProcessDefinitionCode(124);
processTaskRelationUpstream1.setProjectCode(projectCode);
ProcessTaskRelation processTaskRelationUpstream2 = new ProcessTaskRelation();
processTaskRelationUpstream2.setPostTaskCode(taskCode);
processTaskRelationUpstream2.setPreTaskVersion(2);
processTaskRelationUpstream1.setPreTaskCode(123);
processTaskRelationUpstream0.setProcessDefinitionCode(125);
processTaskRelationUpstream2.setProjectCode(projectCode);
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelationUpstream0);
@ -382,4 +385,80 @@ public class ProcessTaskRelationServiceTest {
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
Assert.assertEquals(2, ((List) relation.get("data")).size());
}
@Test
public void testDeleteDownstreamRelation() {
long projectCode = 1L;
long taskCode = 2L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
List<ProcessTaskRelation> processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode, taskCode);
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(processTaskRelationList);
List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = new ArrayList<>();
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
{
put(123L, 2);
}
});
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
{
put(124L, 1);
}
});
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
{
put(125L, 3);
}
});
Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode);
Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new ArrayList())).thenReturn(3);
Mockito.when(processTaskRelationMapper.deleteBatchIds(new ArrayList())).thenReturn(3);
Map<String, Object> result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode);
Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
}
@Test
public void testDeleteUpstreamRelation() {
long projectCode = 1L;
long taskCode = 2L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
List<ProcessTaskRelation> processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode, taskCode);
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(processTaskRelationList);
List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = new ArrayList<>();
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
{
put(123L, 2);
}
});
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
{
put(124L, 1);
}
});
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
{
put(125L, 3);
}
});
Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode);
Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new ArrayList())).thenReturn(3);
Mockito.when(processTaskRelationMapper.deleteBatchIds(new ArrayList())).thenReturn(3);
Map<String, Object> result1 = processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode, "123", taskCode);
Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
}
}

40
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@ -101,6 +102,45 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
*/
List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode);
/**
* query task relation by codes
*
* @param projectCode projectCode
* @param taskCode taskCode
* @param postTaskCodes postTaskCodes list
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryDownstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("postTaskCodes") Long[] postTaskCodes);
/**
* query task relation by codes
*
* @param projectCode projectCode
* @param taskCode taskCode
* @param preTaskCodes preTaskCode list
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("preTaskCodes") Long[] preTaskCodes);
/**
* count upstream by codes
*
* @param projectCode projectCode
* @param taskCode taskCode
* @param processDefinitionCodes processDefinitionCodes
* @return upstream count list group by process definition code
*/
List<Map<Long, Integer>> countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long projectCode,
@Param("processDefinitionCodes") Long[] processDefinitionCodes, @Param("taskCode") long taskCode);
/**
* batch update process task relation pre task
*
* @param processTaskRelationList process task relation list
* @return update num
*/
int batchUpdateProcessTaskRelationPreTask(@Param("processTaskRelationList") List<ProcessTaskRelation> processTaskRelationList);
/**
* query by code
*

56
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -92,6 +92,62 @@
and post_task_code = #{taskCode}
</select>
<select id="queryDownstreamByCodes" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE project_code = #{projectCode}
and pre_task_code = #{taskCode}
<if test="postTaskCodes != null and postTaskCodes.length != 0">
and post_task_code in
<foreach collection="postTaskCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</select>
<select id="queryUpstreamByCodes" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE project_code = #{projectCode}
and post_task_code = #{taskCode}
<if test="preTaskCodes != null and preTaskCodes.length != 0">
and pre_task_code in
<foreach collection="preTaskCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</select>
<select id="countUpstreamByCodeGroupByProcessDefinitionCode" resultType="java.lang.Integer">
select process_definition_code,
count(0)
from t_ds_process_task_relation
WHERE project_code = #{projectCode}
and post_task_code = #{taskCode}
<if test="processDefinitionCodes != null and processDefinitionCodes.length != 0">
and process_definition_code in
<foreach collection="processDefinitionCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
group by process_definition_code
</select>
<update id="batchUpdateProcessTaskRelationPreTask" parameterType="java.util.List">
<foreach collection="processTaskRelationList" item="processTaskRelation" index="index" open="" close="" separator=";">
update t_ds_process_task_relation
<set>
pre_task_code=#{processTaskRelation.preTaskCode},
pre_task_version=#{processTaskRelation.preTaskVersion}
</set>
<where>
WHERE id = #{processTaskRelation.id}
</where>
</foreach>
</update>
<select id="queryByCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>

Loading…
Cancel
Save