Browse Source

fix 2975:download resource need find the tenant code of resource owner (#3109)

pull/3/MERGE
lgcareer 4 years ago committed by GitHub
parent
commit
07a6aec1fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  2. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  3. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  4. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

20
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -16,20 +16,20 @@
*/ */
package org.apache.dolphinscheduler.dao.entity; package org.apache.dolphinscheduler.dao.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.Map;
/** /**
* task instance * task instance
@ -209,7 +209,7 @@ public class TaskInstance implements Serializable {
@TableField(exist = false) @TableField(exist = false)
private List<String> resources; private Map<String,String> resources;
@ -451,10 +451,14 @@ public class TaskInstance implements Serializable {
|| (this.getState().typeIsFailure() && !taskCanRetry()); || (this.getState().typeIsFailure() && !taskCanRetry());
} }
public List<String> getResources() { public Map<String, String> getResources() {
return resources; return resources;
} }
public void setResources(Map<String, String> resources) {
this.resources = resources;
}
public boolean isSubProcess(){ public boolean isSubProcess(){
return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType)); return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
} }
@ -467,9 +471,7 @@ public class TaskInstance implements Serializable {
return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType)); return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType));
} }
public void setResources(List<String> resources) {
this.resources = resources;
}
/** /**
* determine if you can try again * determine if you can try again

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -168,9 +167,9 @@ public class TaskExecutionContext implements Serializable{
private String workerGroup; private String workerGroup;
/** /**
* resources full name * resources full name and tenant code
*/ */
private List<String> resources; private Map<String,String> resources;
/** /**
* sql TaskExecutionContext * sql TaskExecutionContext
@ -443,11 +442,11 @@ public class TaskExecutionContext implements Serializable{
this.dependenceTaskExecutionContext = dependenceTaskExecutionContext; this.dependenceTaskExecutionContext = dependenceTaskExecutionContext;
} }
public List<String> getResources() { public Map<String, String> getResources() {
return resources; return resources;
} }
public void setResources(List<String> resources) { public void setResources(Map<String, String> resources) {
this.resources = resources; this.resources = resources;
} }

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
@ -32,7 +33,6 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@ -50,14 +50,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/** /**
* TaskUpdateQueue consumer * TaskUpdateQueue consumer
*/ */
@ -362,10 +358,10 @@ public class TaskPriorityQueueConsumer extends Thread{
} }
/** /**
* get resource full name list * get resource map key is full name and value is tenantCode
*/ */
private List<String> getResourceFullNames(TaskNode taskNode) { private Map<String,String> getResourceFullNames(TaskNode taskNode) {
List<String> resourceFullNameList = new ArrayList<>(); Map<String,String> resourceMap = new HashMap<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) { if (baseParam != null) {
@ -375,7 +371,10 @@ public class TaskPriorityQueueConsumer extends Thread{
// filter the resources that the resource id equals 0 // filter the resources that the resource id equals 0
Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet()); Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(oldVersionResources)) { if (CollectionUtils.isNotEmpty(oldVersionResources)) {
resourceFullNameList.addAll(oldVersionResources.stream().map(resource -> resource.getRes()).collect(Collectors.toSet()));
oldVersionResources.forEach(
(t)->resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
);
} }
// get the resource id in order to get the resource names in batch // get the resource id in order to get the resource names in batch
@ -386,13 +385,13 @@ public class TaskPriorityQueueConsumer extends Thread{
Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]);
List<Resource> resources = processService.listResourceByIds(resourceIds); List<Resource> resources = processService.listResourceByIds(resourceIds);
resourceFullNameList.addAll(resources.stream() resources.forEach(
.map(resourceInfo -> resourceInfo.getFullName()) (t)->resourceMap.put(t.getFullName(),processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
.collect(Collectors.toList())); );
} }
} }
} }
return resourceFullNameList; return resourceMap;
} }
} }

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -18,13 +18,16 @@ package org.apache.dolphinscheduler.server.worker.runner;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@ -94,7 +97,6 @@ public class TaskExecuteThread implements Runnable {
// copy hdfs/minio file to local // copy hdfs/minio file to local
downloadResource(taskExecutionContext.getExecutePath(), downloadResource(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources(), taskExecutionContext.getResources(),
taskExecutionContext.getTenantCode(),
logger); logger);
taskExecutionContext.setTaskParams(taskNode.getParams()); taskExecutionContext.setTaskParams(taskNode.getParams());
@ -224,22 +226,25 @@ public class TaskExecuteThread implements Runnable {
* @param logger * @param logger
*/ */
private void downloadResource(String execLocalPath, private void downloadResource(String execLocalPath,
List<String> projectRes, Map<String,String> projectRes,
String tenantCode,
Logger logger) throws Exception { Logger logger) throws Exception {
if (CollectionUtils.isEmpty(projectRes)){ if (MapUtils.isEmpty(projectRes)){
return; return;
} }
for (String resource : projectRes) { Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();
File resFile = new File(execLocalPath, resource);
for (Map.Entry<String,String> resource : resEntries) {
String fullName = resource.getKey();
String tenantCode = resource.getValue();
File resFile = new File(execLocalPath, fullName);
if (!resFile.exists()) { if (!resFile.exists()) {
try { try {
// query the tenant code of the resource according to the name of the resource // query the tenant code of the resource according to the name of the resource
String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, resource); String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, fullName);
logger.info("get resource file from hdfs :{}", resHdfsPath); logger.info("get resource file from hdfs :{}", resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resource, false, true); HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
}catch (Exception e){ }catch (Exception e){
logger.error(e.getMessage(),e); logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage()); throw new RuntimeException(e.getMessage());

Loading…
Cancel
Save