Browse Source

[Bug-14105][Task] Replace default tenant when downloading resource (#14106)

3.2.0-release
Aaron Wang 2 years ago committed by GitHub
parent
commit
74ed7856bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java
  2. 7
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

2
dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java

@ -312,7 +312,7 @@ public class HdfsStorageOperator implements Closeable, StorageOperate {
} }
@Override @Override
public void download(String bucketName, String srcHdfsFilePath, String dstFile, boolean deleteSource, public void download(String tenantCode, String srcHdfsFilePath, String dstFile, boolean deleteSource,
boolean overwrite) throws IOException { boolean overwrite) throws IOException {
copyHdfsToLocal(srcHdfsFilePath, dstFile, deleteSource, overwrite); copyHdfsToLocal(srcHdfsFilePath, dstFile, deleteSource, overwrite);
} }

7
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

@ -110,13 +110,16 @@ public class TaskExecutionCheckerUtils {
TaskExecutionContext taskExecutionContext) { TaskExecutionContext taskExecutionContext) {
String execLocalPath = taskExecutionContext.getExecutePath(); String execLocalPath = taskExecutionContext.getExecutePath();
String tenant = taskExecutionContext.getTenantCode(); String tenant = taskExecutionContext.getTenantCode();
String actualTenant =
TenantConstants.DEFAULT_TENANT_CODE.equals(tenant) ? TenantConstants.BOOTSTRAPT_SYSTEM_USER : tenant;
Map<String, String> projectRes = taskExecutionContext.getResources(); Map<String, String> projectRes = taskExecutionContext.getResources();
if (MapUtils.isEmpty(projectRes)) { if (MapUtils.isEmpty(projectRes)) {
return; return;
} }
List<Pair<String, String>> downloadFiles = new ArrayList<>(); List<Pair<String, String>> downloadFiles = new ArrayList<>();
projectRes.keySet().forEach(fullName -> { projectRes.keySet().forEach(fullName -> {
String fileName = storageOperate.getResourceFileName(tenant, fullName); String fileName = storageOperate.getResourceFileName(actualTenant, fullName);
projectRes.put(fullName, fileName); projectRes.put(fullName, fileName);
File resFile = new File(execLocalPath, fileName); File resFile = new File(execLocalPath, fileName);
boolean notExist = !resFile.exists(); boolean notExist = !resFile.exists();
@ -138,7 +141,7 @@ public class TaskExecutionCheckerUtils {
log.info("get resource file from path:{}", fullName); log.info("get resource file from path:{}", fullName);
long resourceDownloadStartTime = System.currentTimeMillis(); long resourceDownloadStartTime = System.currentTimeMillis();
storageOperate.download(taskExecutionContext.getTenantCode(), fullName, storageOperate.download(actualTenant, fullName,
execLocalPath + File.separator + fileName, false, execLocalPath + File.separator + fileName, false,
true); true);
WorkerServerMetrics WorkerServerMetrics

Loading…
Cancel
Save