Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +23 −21 Original line number Diff line number Diff line Loading @@ -48,13 +48,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.HashSet; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.dolphinscheduler.common.Constants.*; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; /** * TaskUpdateQueue consumer Loading Loading @@ -328,36 +328,38 @@ public class TaskPriorityQueueConsumer extends Thread{ return false; } /** * create project resource files * get resource full name list */ private List<String> getResourceFullNames(TaskNode taskNode) { Set<Integer> resourceIdsSet = new HashSet<>(); List<String> resourceFullNameList = new ArrayList<>(); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); if (baseParam != null) { List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList(); if (projectResourceFiles != null) { Stream<Integer> resourceInfotream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId()); resourceIdsSet.addAll(resourceInfotream.collect(Collectors.toSet())); } // filter the resources that the resource id equals 0 Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet()); if (CollectionUtils.isNotEmpty(oldVersionResources)) { resourceFullNameList.addAll(oldVersionResources.stream().map(resource -> resource.getRes()).collect(Collectors.toSet())); } if (CollectionUtils.isEmpty(resourceIdsSet)){ return null; } // get the resource id in order to get the resource names in batch Stream<Integer> resourceIdStream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId()); Set<Integer> resourceIdsSet = resourceIdStream.collect(Collectors.toSet()); if (CollectionUtils.isNotEmpty(resourceIdsSet)) { Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); List<Resource> resources = processService.listResourceByIds(resourceIds); List<String> resourceFullNames = resources.stream() resourceFullNameList.addAll(resources.stream() .map(resourceInfo -> resourceInfo.getFullName()) .collect(Collectors.toList()); .collect(Collectors.toList())); } } } return resourceFullNames; return resourceFullNameList; } } Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +23 −21 Original line number Diff line number Diff line Loading @@ -48,13 +48,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.HashSet; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.dolphinscheduler.common.Constants.*; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; /** * TaskUpdateQueue consumer Loading Loading @@ -328,36 +328,38 @@ public class TaskPriorityQueueConsumer extends Thread{ return false; } /** * create project resource files * get resource full name list */ private List<String> getResourceFullNames(TaskNode taskNode) { Set<Integer> resourceIdsSet = new HashSet<>(); List<String> resourceFullNameList = new ArrayList<>(); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); if (baseParam != null) { List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList(); if (projectResourceFiles != null) { Stream<Integer> resourceInfotream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId()); resourceIdsSet.addAll(resourceInfotream.collect(Collectors.toSet())); } // filter the resources that the resource id equals 0 Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet()); if (CollectionUtils.isNotEmpty(oldVersionResources)) { resourceFullNameList.addAll(oldVersionResources.stream().map(resource -> resource.getRes()).collect(Collectors.toSet())); } if (CollectionUtils.isEmpty(resourceIdsSet)){ return null; } // get the resource id in order to get the resource names in batch Stream<Integer> resourceIdStream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId()); Set<Integer> resourceIdsSet = resourceIdStream.collect(Collectors.toSet()); if (CollectionUtils.isNotEmpty(resourceIdsSet)) { Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); List<Resource> resources = processService.listResourceByIds(resourceIds); List<String> resourceFullNames = resources.stream() resourceFullNameList.addAll(resources.stream() .map(resourceInfo -> resourceInfo.getFullName()) .collect(Collectors.toList()); .collect(Collectors.toList())); } } } return resourceFullNames; return resourceFullNameList; } }