diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 53b56e54b2..5a6aa4998f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -16,20 +16,20 @@ */ package org.apache.dolphinscheduler.dao.entity; +import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; import java.io.Serializable; import java.util.Date; -import java.util.List; +import java.util.Map; /** * task instance @@ -209,7 +209,7 @@ public class TaskInstance implements Serializable { @TableField(exist = false) - private List resources; + private Map resources; @@ -451,10 +451,14 @@ public class TaskInstance implements Serializable { || (this.getState().typeIsFailure() && !taskCanRetry()); } - public List getResources() { + public Map getResources() { return resources; } + public void setResources(Map resources) { + this.resources = resources; + } + public boolean isSubProcess(){ return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType)); } @@ -467,9 +471,7 @@ public class TaskInstance implements Serializable { return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType)); } - public void setResources(List resources) { - this.resources = resources; - } + /** * determine if you can try again diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 563f5c8459..41331abfe0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; -import java.util.List; import java.util.Map; /** @@ -168,9 +167,9 @@ public class TaskExecutionContext implements Serializable{ private String workerGroup; /** - * resources full name + * resources full name and tenant code */ - private List resources; + private Map resources; /** * sql TaskExecutionContext @@ -443,11 +442,11 @@ public class TaskExecutionContext implements Serializable{ this.dependenceTaskExecutionContext = dependenceTaskExecutionContext; } - public List getResources() { + public Map getResources() { return resources; } - public void setResources(List resources) { + public void setResources(Map resources) { this.resources = resources; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 904914a55a..ee48ca0f8a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.consumer; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -32,7 +33,6 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; @@ -50,14 +50,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; - /** * TaskUpdateQueue consumer */ @@ -362,10 +358,10 @@ public class TaskPriorityQueueConsumer extends Thread{ } /** - * get resource full name list + * get resource map key is full name and value is tenantCode */ - private List getResourceFullNames(TaskNode taskNode) { - List resourceFullNameList = new ArrayList<>(); + private Map getResourceFullNames(TaskNode taskNode) { + Map resourceMap = new HashMap<>(); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); if (baseParam != null) { @@ -375,7 +371,10 @@ public class TaskPriorityQueueConsumer extends Thread{ // filter the resources that the resource id equals 0 Set oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet()); if (CollectionUtils.isNotEmpty(oldVersionResources)) { - resourceFullNameList.addAll(oldVersionResources.stream().map(resource -> resource.getRes()).collect(Collectors.toSet())); + + oldVersionResources.forEach( + (t)->resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)) + ); } // get the resource id in order to get the resource names in batch @@ -386,13 +385,13 @@ public class TaskPriorityQueueConsumer extends Thread{ Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); List resources = processService.listResourceByIds(resourceIds); - resourceFullNameList.addAll(resources.stream() - .map(resourceInfo -> resourceInfo.getFullName()) - .collect(Collectors.toList())); + resources.forEach( + (t)->resourceMap.put(t.getFullName(),processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)) + ); } } } - return resourceFullNameList; + return resourceMap; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index d2d783ac85..592060b0e8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -18,13 +18,16 @@ package org.apache.dolphinscheduler.server.worker.runner; import com.alibaba.fastjson.JSONObject; +import org.apache.commons.collections.MapUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.HadoopUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; @@ -94,7 +97,6 @@ public class TaskExecuteThread implements Runnable { // copy hdfs/minio file to local downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), - taskExecutionContext.getTenantCode(), logger); taskExecutionContext.setTaskParams(taskNode.getParams()); @@ -224,22 +226,25 @@ public class TaskExecuteThread implements Runnable { * @param logger */ private void downloadResource(String execLocalPath, - List projectRes, - String tenantCode, + Map projectRes, Logger logger) throws Exception { - if (CollectionUtils.isEmpty(projectRes)){ + if (MapUtils.isEmpty(projectRes)){ return; } - for (String resource : projectRes) { - File resFile = new File(execLocalPath, resource); + Set> resEntries = projectRes.entrySet(); + + for (Map.Entry resource : resEntries) { + String fullName = resource.getKey(); + String tenantCode = resource.getValue(); + File resFile = new File(execLocalPath, fullName); if (!resFile.exists()) { try { // query the tenant code of the resource according to the name of the resource - String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, resource); + String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, fullName); logger.info("get resource file from hdfs :{}", resHdfsPath); - HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resource, false, true); + HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true); }catch (Exception e){ logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage());