Browse Source

support download the resources existed in process definition which was old version (#2506)

* fix #2442 and remove unavailable code

* revert verifyResourceName method

* Add ServiceException

* add ServiceExceptionTest

* update ServiceExceptionTest

* add ServiceExceptionTest in pom

* support download the resources existed in process definition which was old version
pull/2/head
lgcareer 5 years ago committed by GitHub
parent
commit
733e8bcb0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

34
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -48,13 +48,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.HashSet; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.*; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/** /**
* TaskUpdateQueue consumer * TaskUpdateQueue consumer
@ -328,36 +328,38 @@ public class TaskPriorityQueueConsumer extends Thread{
return false; return false;
} }
/** /**
* create project resource files * get resource full name list
*/ */
private List<String> getResourceFullNames(TaskNode taskNode) { private List<String> getResourceFullNames(TaskNode taskNode) {
List<String> resourceFullNameList = new ArrayList<>();
Set<Integer> resourceIdsSet = new HashSet<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) { if (baseParam != null) {
List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList(); List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
if (projectResourceFiles != null) { if (projectResourceFiles != null) {
Stream<Integer> resourceInfotream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId());
resourceIdsSet.addAll(resourceInfotream.collect(Collectors.toSet()));
} // filter the resources that the resource id equals 0
Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(oldVersionResources)) {
resourceFullNameList.addAll(oldVersionResources.stream().map(resource -> resource.getRes()).collect(Collectors.toSet()));
} }
if (CollectionUtils.isEmpty(resourceIdsSet)){ // get the resource id in order to get the resource names in batch
return null; Stream<Integer> resourceIdStream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId());
} Set<Integer> resourceIdsSet = resourceIdStream.collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(resourceIdsSet)) {
Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]);
List<Resource> resources = processService.listResourceByIds(resourceIds); List<Resource> resources = processService.listResourceByIds(resourceIds);
resourceFullNameList.addAll(resources.stream()
List<String> resourceFullNames = resources.stream()
.map(resourceInfo -> resourceInfo.getFullName()) .map(resourceInfo -> resourceInfo.getFullName())
.collect(Collectors.toList()); .collect(Collectors.toList()));
}
}
}
return resourceFullNames; return resourceFullNameList;
} }
} }

Loading…
Cancel
Save