From 492b318bd321d35247488e1f181e3ea9d1259963 Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Fri, 26 Mar 2021 10:11:56 +0800 Subject: [PATCH] [Fix][Server] Fix clear task execute path is related to master (#5123) --- .../common/utils/FileUtils.java | 23 +--------- .../common/utils/FileUtilsTest.java | 4 +- .../master/runner/MasterExecThread.java | 30 ------------- .../processor/TaskExecuteProcessor.java | 2 +- .../worker/runner/TaskExecuteThread.java | 45 ++++++++++++++++--- .../server/master/MasterExecThreadTest.java | 6 +-- 6 files changed, 43 insertions(+), 67 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java index 0dcfbddaf4..ae6291a854 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java @@ -121,27 +121,8 @@ public class FileUtils { * @return directory of process execution */ public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId, int taskInstanceId) { - String fileName = String.format("%s/exec/process/%s/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId), - Integer.toString(processDefineId), Integer.toString(processInstanceId), Integer.toString(taskInstanceId)); - File file = new File(fileName); - if (!file.getParentFile().exists()) { - file.getParentFile().mkdirs(); - } - - return fileName; - } - - /** - * directory of process instances - * - * @param projectId project id - * @param processDefineId process definition id - * @param processInstanceId process instance id - * @return directory of process instances - */ - public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId) { - String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId), - Integer.toString(processDefineId), Integer.toString(processInstanceId)); + String fileName = String.format("%s/exec/process/%d/%d/%d/%d", DATA_BASEDIR, + projectId, processDefineId, processInstanceId, taskInstanceId); File file = new File(fileName); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java index a4a39ae252..a1ddef158f 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java @@ -60,10 +60,8 @@ public class FileUtilsTest { @Test public void testGetProcessExecDir() { - String dir = FileUtils.getProcessExecDir(1,2,3, 4); + String dir = FileUtils.getProcessExecDir(1, 2, 3, 4); Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3/4", dir); - dir = FileUtils.getProcessExecDir(1,2,3); - Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3", dir); } @Test diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index b7a4d00380..b9ad8f37b6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -42,7 +42,6 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; @@ -61,16 +60,11 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; -import org.apache.commons.io.FileUtils; - -import java.io.File; -import java.io.IOException; import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Date; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -78,7 +72,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -236,8 +229,6 @@ public class MasterExecThread implements Runnable { processService.updateProcessInstance(processInstance); } finally { taskExecService.shutdown(); - // post handle - postHandle(); } } @@ -427,27 +418,6 @@ public class MasterExecThread implements Runnable { } } - /** - * process post handle - */ - private void postHandle() { - logger.info("develop mode is: {}", CommonUtils.isDevelopMode()); - - if (!CommonUtils.isDevelopMode()) { - // get exec dir - String execLocalPath = org.apache.dolphinscheduler.common.utils.FileUtils - .getProcessExecDir(processInstance.getProcessDefinition().getProjectId(), - processInstance.getProcessDefinitionId(), - processInstance.getId()); - - try { - FileUtils.deleteDirectory(new File(execLocalPath)); - } catch (IOException e) { - logger.error("delete exec dir failed ", e); - } - } - } - /** * submit task to execute * diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index cfd2c3fe2f..f03d86bb65 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -141,7 +141,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext); - logger.info("task instance local execute path : {} ", execLocalPath); + logger.info("task instance local execute path : {}", execLocalPath); taskExecutionContext.setExecutePath(execLocalPath); FileUtils.taskLoggerThreadLocal.set(taskLogger); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index c036ac9f69..409c2b7a2e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.RetryerUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -45,6 +46,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.commons.collections.MapUtils; import java.io.File; +import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -71,17 +73,17 @@ public class TaskExecuteThread implements Runnable, Delayed { private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class); /** - * task instance + * task instance */ private TaskExecutionContext taskExecutionContext; /** - * abstract task + * abstract task */ private AbstractTask task; /** - * task callback service + * task callback service */ private TaskCallbackService taskCallbackService; @@ -185,9 +187,38 @@ public class TaskExecuteThread implements Runnable, Delayed { responseCommand.setAppIds(task.getAppIds()); } finally { taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + clearTaskExecPath(); + } + } + + /** + * when task finish, clear execute path. + */ + private void clearTaskExecPath() { + logger.info("develop mode is: {}", CommonUtils.isDevelopMode()); + + if (!CommonUtils.isDevelopMode()) { + // get exec dir + String execLocalPath = taskExecutionContext.getExecutePath(); + + if (StringUtils.isEmpty(execLocalPath)) { + logger.warn("task: {} exec local path is empty.", taskExecutionContext.getTaskName()); + return; + } + if ("/".equals(execLocalPath)) { + logger.warn("task: {} exec local path is '/', direct deletion is not allowed", taskExecutionContext.getTaskName()); + return; + } + + try { + org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath)); + logger.info("exec local path: {} cleared.", execLocalPath); + } catch (IOException e) { + logger.error("delete exec dir failed : {}", e.getMessage(), e); + } } } @@ -196,7 +227,7 @@ public class TaskExecuteThread implements Runnable, Delayed { * @return */ private Map getGlobalParamsMap() { - Map globalParamsMap = new HashMap<>(16); + Map globalParamsMap = new HashMap<>(16); // global params string String globalParamsStr = taskExecutionContext.getGlobalParams(); @@ -241,7 +272,7 @@ public class TaskExecuteThread implements Runnable, Delayed { } /** - * kill task + * kill task */ public void kill() { if (task != null) { @@ -261,7 +292,7 @@ public class TaskExecuteThread implements Runnable, Delayed { * @param logger */ private void downloadResource(String execLocalPath, - Map projectRes, + Map projectRes, Logger logger) throws Exception { if (MapUtils.isEmpty(projectRes)) { return; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java index 6979a939e7..cdea2526af 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -102,16 +102,12 @@ public class MasterExecThreadTest { processDefinition.setGlobalParamList(Collections.EMPTY_LIST); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); - masterExecThread = PowerMockito.spy(new MasterExecThread( - processInstance - , processService - , null, null, config)); + masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService, null, null, config)); // prepareProcess init dag Field dag = MasterExecThread.class.getDeclaredField("dag"); dag.setAccessible(true); dag.set(masterExecThread, new DAG()); PowerMockito.doNothing().when(masterExecThread, "executeProcess"); - PowerMockito.doNothing().when(masterExecThread, "postHandle"); PowerMockito.doNothing().when(masterExecThread, "prepareProcess"); PowerMockito.doNothing().when(masterExecThread, "runProcess"); PowerMockito.doNothing().when(masterExecThread, "endProcess");