diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index e8c9833e0e..368d33d7a4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -192,8 +192,6 @@ public class MasterExecThread implements Runnable { processService.updateProcessInstance(processInstance); }finally { taskExecService.shutdown(); - // post handle - postHandle(); } } @@ -381,27 +379,6 @@ public class MasterExecThread implements Runnable { } } - /** - * process post handle - */ - private void postHandle() { - logger.info("develop mode is: {}", CommonUtils.isDevelopMode()); - - if (!CommonUtils.isDevelopMode()) { - // get exec dir - String execLocalPath = org.apache.dolphinscheduler.common.utils.FileUtils - .getProcessExecDir(processInstance.getProcessDefinition().getProjectId(), - processInstance.getProcessDefinitionId(), - processInstance.getId()); - - try { - FileUtils.deleteDirectory(new File(execLocalPath)); - } catch (IOException e) { - logger.error("delete exec dir failed ", e); - } - } - } - /** * submit task to execute * @param taskInstance task instance diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 0f4525dee1..81f14e7ea9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.runner; import com.alibaba.fastjson.JSONObject; @@ -25,6 +26,7 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; @@ -34,16 +36,21 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; -import java.util.*; +import java.io.IOException; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * task scheduler thread + * task scheduler thread */ public class TaskExecuteThread implements Runnable { @@ -53,17 +60,17 @@ public class TaskExecuteThread implements Runnable { private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class); /** - * task instance + * task instance */ private TaskExecutionContext taskExecutionContext; /** - * abstract task + * abstract task */ private AbstractTask task; /** - * task callback service + * task callback service */ private TaskCallbackService taskCallbackService; @@ -132,7 +139,7 @@ public class TaskExecuteThread implements Runnable { responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); - }catch (Exception e){ + } catch (Exception e) { logger.error("task scheduler failure", e); kill(); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); @@ -141,18 +148,47 @@ public class TaskExecuteThread implements Runnable { responseCommand.setAppIds(task.getAppIds()); } finally { taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + clearTaskExecPath(); + } + } + + /** + * when task finish, clear execute path. + */ + private void clearTaskExecPath() { + logger.info("develop mode is: {}", CommonUtils.isDevelopMode()); + + if (!CommonUtils.isDevelopMode()) { + // get exec dir + String execLocalPath = taskExecutionContext.getExecutePath(); + if (StringUtils.isEmpty(execLocalPath)) { + logger.warn("task: {} exec local path is empty.", taskExecutionContext.getTaskName()); + return; + } + + if ("/".equals(execLocalPath)) { + logger.warn("task: {} exec local path is '/', direct deletion is not allowed", taskExecutionContext.getTaskName()); + return; + } + + try { + org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath)); + logger.info("exec local path: {} cleared.", execLocalPath); + } catch (IOException e) { + logger.error("delete exec dir failed : {}", e.getMessage(), e); + } } + } /** * get global paras map - * @return */ private Map getGlobalParamsMap() { - Map globalParamsMap = new HashMap<>(16); + Map globalParamsMap = new HashMap<>(16); // global params string String globalParamsStr = taskExecutionContext.getGlobalParams(); @@ -165,17 +201,17 @@ public class TaskExecuteThread implements Runnable { /** * set task timeout + * * @param taskExecutionContext TaskExecutionContext - * @param taskNode */ private void setTaskTimeout(TaskExecutionContext taskExecutionContext, TaskNode taskNode) { // the default timeout is the maximum value of the integer taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE); TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter(); - if (taskTimeoutParameter.getEnable()){ + if (taskTimeoutParameter.getEnable()) { // get timeout strategy taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode()); - switch (taskTimeoutParameter.getStrategy()){ + switch (taskTimeoutParameter.getStrategy()) { case WARN: break; case FAILED: @@ -196,38 +232,32 @@ public class TaskExecuteThread implements Runnable { } } - /** - * kill task + * kill task */ - public void kill(){ - if (task != null){ + public void kill() { + if (task != null) { try { task.cancelApplication(true); - }catch (Exception e){ - logger.error(e.getMessage(),e); + } catch (Exception e) { + logger.error(e.getMessage(), e); } } } - /** * download resource file - * - * @param execLocalPath - * @param projectRes - * @param logger */ private void downloadResource(String execLocalPath, - Map projectRes, + Map projectRes, Logger logger) throws Exception { - if (MapUtils.isEmpty(projectRes)){ + if (MapUtils.isEmpty(projectRes)) { return; } Set> resEntries = projectRes.entrySet(); - for (Map.Entry resource : resEntries) { + for (Map.Entry resource : resEntries) { String fullName = resource.getKey(); String tenantCode = resource.getValue(); File resFile = new File(execLocalPath, fullName); @@ -238,8 +268,8 @@ public class TaskExecuteThread implements Runnable { logger.info("get resource file from hdfs :{}", resHdfsPath); HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true); - }catch (Exception e){ - logger.error(e.getMessage(),e); + } catch (Exception e) { + logger.error(e.getMessage(), e); throw new RuntimeException(e.getMessage()); } } else { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java new file mode 100644 index 0000000000..e953868d1b --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import static org.powermock.api.mockito.PowerMockito.mock; + +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({TaskExecuteThread.class}) +public class TaskExecuteThreadTest { + + private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadTest.class); + + TaskExecutionContext taskExecutionContext; + + TaskCallbackService taskCallbackService; + + ApplicationContext applicationContext; + + TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; + + private ProcessService processService; + + @Before + public void init() throws Exception { + taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + taskCallbackService = PowerMockito.mock(TaskCallbackService.class); + applicationContext = PowerMockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl(); + Mockito.when(applicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)).thenReturn(taskExecutionContextCacheManager); + } + + @Test + public void testTaskClearExecPath() throws Exception { + processService = mock(ProcessService.class); + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); + TaskCallbackService taskCallbackService = Mockito.mock(TaskCallbackService.class); + TaskExecuteThread taskExecuteThread = PowerMockito.spy(new TaskExecuteThread(taskExecutionContext, taskCallbackService, logger)); + Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/"); + Assert.assertTrue(true); + } + + @Test + public void testClearTaskExecPath() { + + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, logger); + Mockito.when(CommonUtils.isDevelopMode()).thenReturn(false); + Mockito.when(taskExecutionContext.getTaskJson()).thenThrow(new RuntimeException("测试异常后finally执行")); + try { + taskExecuteThread.run(); + } catch (Exception ignored) { + //ignored + } + + Mockito.when(taskExecutionContext.getExecutePath()).thenReturn(null); + try { + taskExecuteThread.run(); + } catch (Exception ignored) { + //ignored + } + + Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/"); + try { + taskExecuteThread.run(); + } catch (Exception ignored) { + //ignored + } + + Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/data/test-testClearTaskExecPath"); + try { + taskExecuteThread.run(); + } catch (Exception ignored) { + //ignored + } + + Assert.assertTrue(true); + + } + + @Test + public void testNotClearTaskExecPath() { + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, logger); + Mockito.when(CommonUtils.isDevelopMode()).thenReturn(true); + taskExecuteThread.run(); + Assert.assertTrue(true); + } +} diff --git a/pom.xml b/pom.xml index d7d0847a4f..2a986f0015 100644 --- a/pom.xml +++ b/pom.xml @@ -794,6 +794,7 @@ **/server/log/WorkerLogFilterTest.java **/server/master/consumer/TaskPriorityQueueConsumerTest.java **/server/master/runner/MasterTaskExecThreadTest.java + **/server/worker/runner/TaskExecuteThreadTest.java **/server/master/dispatch/executor/NettyExecutorManagerTest.java **/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java **/server/master/dispatch/host/assign/RandomSelectorTest.java