From 71fcea593782b7dc5d5ee4133e5bbadbefcf963d Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Wed, 16 Feb 2022 23:06:32 +0800 Subject: [PATCH] fix empty upstreamTaskCodes (#8405) Co-authored-by: caishunfeng <534328519@qq.com> --- .../impl/TaskDefinitionServiceImpl.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 54709697de..e722210688 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -54,6 +54,7 @@ import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -150,11 +151,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * create single task definition that binds the workflow * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param processDefinitionCode process definition code * @param taskDefinitionJsonObj task definition json object - * @param upstreamCodes upstream task codes, sep comma + * @param upstreamCodes upstream task codes, sep comma * @return create result code */ @Transactional(rollbackFor = RuntimeException.class) @@ -227,7 +228,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList())); } int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(), - processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE); + processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE); if (insertResult != Constants.EXIT_CODE_SUCCESS) { putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); @@ -272,6 +273,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * delete task definition * Only offline and no downstream dependency can be deleted + * * @param loginUser login user * @param projectCode project code * @param taskCode task code @@ -301,9 +303,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe List processTaskRelationList = processTaskRelationMapper.queryDownstreamByTaskCode(taskCode); if (!processTaskRelationList.isEmpty()) { Set postTaskCodes = processTaskRelationList - .stream() - .map(ProcessTaskRelation::getPostTaskCode) - .collect(Collectors.toSet()); + .stream() + .map(ProcessTaskRelation::getPostTaskCode) + .collect(Collectors.toSet()); putMsg(result, Status.TASK_HAS_DOWNSTREAM, StringUtils.join(postTaskCodes, ",")); return result; } @@ -326,7 +328,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } private void updateDag(User loginUser, Map result, long processDefinitionCode, List processTaskRelationList, - List taskDefinitionLogs) { + List taskDefinitionLogs) { ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); if (processDefinition == null) { throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST); @@ -337,7 +339,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), - insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE); + insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE); if (insertResult == Constants.EXIT_CODE_SUCCESS) { putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); @@ -433,11 +435,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * update task definition and upstream * - * @param loginUser login user - * @param projectCode project code - * @param taskCode task definition code + * @param loginUser login user + * @param projectCode project code + * @param taskCode task definition code * @param taskDefinitionJsonObj task definition json object - * @param upstreamCodes upstream task codes, sep comma + * @param upstreamCodes upstream task codes, sep comma * @return update result code */ @Override @@ -449,7 +451,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } List upstreamTaskRelations = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); Set upstreamCodeSet = upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet()); - Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); + Set upstreamTaskCodes = Collections.emptySet(); + if (StringUtils.isNotEmpty(upstreamCodes)) { + upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); + } if (CollectionUtils.isEqualCollection(upstreamCodeSet, upstreamTaskCodes) && taskDefinitionToUpdate == null) { putMsg(result, Status.SUCCESS); return result; @@ -642,7 +647,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } Page page = new Page<>(pageNo, pageSize); IPage taskMainInfoIPage = taskDefinitionMapper.queryDefineListPaging(page, projectCode, searchWorkflowName, - searchTaskName, taskType == null ? StringUtils.EMPTY : taskType.getDesc()); + searchTaskName, taskType == null ? StringUtils.EMPTY : taskType.getDesc()); List records = taskMainInfoIPage.getRecords(); if (!records.isEmpty()) { Map taskMainInfoMap = new HashMap<>(); @@ -740,11 +745,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe String resourceIds = taskDefinition.getResourceIds(); if (StringUtils.isNotBlank(resourceIds)) { Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); - PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resourceIdArray,loginUser.getId(),logger); + PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger); try { permissionCheck.checkPermission(); } catch (Exception e) { - logger.error(e.getMessage(),e); + logger.error(e.getMessage(), e); putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION); return result; }