From 733e8bcb0daaec64d694b98bd38736bb703216e8 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Fri, 24 Apr 2020 14:09:36 +0800 Subject: [PATCH] support download the resources existed in process definition which was old version (#2506) * fix #2442 and remove unavailable code * revert verifyResourceName method * Add ServiceException * add ServiceExceptionTest * update ServiceExceptionTest * add ServiceExceptionTest in pom * support download the resources existed in process definition which was old version --- .../consumer/TaskPriorityQueueConsumer.java | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index cdd9ff2219..480d6657c2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -48,13 +48,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -import java.util.HashSet; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.dolphinscheduler.common.Constants.*; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; /** * TaskUpdateQueue consumer @@ -328,36 +328,38 @@ public class TaskPriorityQueueConsumer extends Thread{ return false; } - /** - * create project resource files + * get resource full name list */ - private List getResourceFullNames(TaskNode taskNode){ - - Set resourceIdsSet = new HashSet<>(); + private List getResourceFullNames(TaskNode taskNode) { + List resourceFullNameList = new ArrayList<>(); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); if (baseParam != null) { List projectResourceFiles = baseParam.getResourceFilesList(); if (projectResourceFiles != null) { - Stream resourceInfotream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId()); - resourceIdsSet.addAll(resourceInfotream.collect(Collectors.toSet())); - } - } + // filter the resources that the resource id equals 0 + Set oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet()); + if (CollectionUtils.isNotEmpty(oldVersionResources)) { + resourceFullNameList.addAll(oldVersionResources.stream().map(resource -> resource.getRes()).collect(Collectors.toSet())); + } - if (CollectionUtils.isEmpty(resourceIdsSet)){ - return null; - } - - Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); + // get the resource id in order to get the resource names in batch + Stream resourceIdStream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId()); + Set resourceIdsSet = resourceIdStream.collect(Collectors.toSet()); - List resources = processService.listResourceByIds(resourceIds); + if (CollectionUtils.isNotEmpty(resourceIdsSet)) { + Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); - List resourceFullNames = resources.stream() - .map(resourceInfo -> resourceInfo.getFullName()) - .collect(Collectors.toList()); + List resources = processService.listResourceByIds(resourceIds); + resourceFullNameList.addAll(resources.stream() + .map(resourceInfo -> resourceInfo.getFullName()) + .collect(Collectors.toList())); + } + } + } - return resourceFullNames; + return resourceFullNameList; } }