From 0e88ea3ac8435391adf605f7d54c688e7b498687 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 25 Dec 2023 16:00:41 +0800 Subject: [PATCH] Recreate new TaskInstance Working Directory when exist in worker (#15358) --- .../worker/runner/WorkerTaskExecutor.java | 2 +- .../utils/TaskExecutionContextUtils.java | 35 +++++++++++-------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java index c335f34994..e47a28d264 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java @@ -217,7 +217,7 @@ public abstract class WorkerTaskExecutor implements Runnable { taskExecutionContext.setTenantCode(tenant); log.info("TenantCode: {} check successfully", taskExecutionContext.getTenantCode()); - TaskExecutionContextUtils.createProcessLocalPathIfAbsent(taskExecutionContext); + TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext); log.info("WorkflowInstanceExecDir: {} check successfully", taskExecutionContext.getExecutePath()); TaskChannel taskChannel = diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java index bbe3b1ab4b..dcfeec4e1c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java @@ -80,22 +80,29 @@ public class TaskExecutionContextUtils { } } - public static void createProcessLocalPathIfAbsent(TaskExecutionContext taskExecutionContext) throws TaskException { + public static void createTaskInstanceWorkingDirectory(TaskExecutionContext taskExecutionContext) throws TaskException { + // local execute path + String taskInstanceWorkingDirectory = FileUtils.getProcessExecDir( + taskExecutionContext.getTenantCode(), + taskExecutionContext.getProjectCode(), + taskExecutionContext.getProcessDefineCode(), + taskExecutionContext.getProcessDefineVersion(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); try { - // local execute path - String execLocalPath = FileUtils.getProcessExecDir( - taskExecutionContext.getTenantCode(), - taskExecutionContext.getProjectCode(), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setExecutePath(execLocalPath); - taskExecutionContext.setAppInfoPath(FileUtils.getAppInfoPath(execLocalPath)); - Path executePath = Paths.get(taskExecutionContext.getExecutePath()); - FileUtils.createDirectoryIfNotPresent(executePath); + Path path = Paths.get(taskInstanceWorkingDirectory); + if (Files.deleteIfExists(path)) { + log.warn("The TaskInstance WorkingDirectory: {} is exist, will recreate again", + taskInstanceWorkingDirectory); + } + Files.createDirectories(path); + taskExecutionContext.setExecutePath(taskInstanceWorkingDirectory); + + taskExecutionContext.setExecutePath(taskInstanceWorkingDirectory); + taskExecutionContext.setAppInfoPath(FileUtils.getAppInfoPath(taskInstanceWorkingDirectory)); } catch (Throwable ex) { - throw new TaskException("Cannot create process execute dir", ex); + throw new TaskException( + "Cannot create TaskInstance WorkingDirectory: " + taskInstanceWorkingDirectory + " failed", ex); } }