Browse Source

[Fix][Server] Fix clear task execute path is related to master.

yh2388 4 years ago
parent
commit
d91afd941e
  1. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  2. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -192,8 +192,6 @@ public class MasterExecThread implements Runnable {
processService.updateProcessInstance(processInstance); processService.updateProcessInstance(processInstance);
}finally { }finally {
taskExecService.shutdown(); taskExecService.shutdown();
// post handle
postHandle();
} }
} }
@ -381,27 +379,6 @@ public class MasterExecThread implements Runnable {
} }
} }
/**
* process post handle
*/
private void postHandle() {
logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
if (!CommonUtils.isDevelopMode()) {
// get exec dir
String execLocalPath = org.apache.dolphinscheduler.common.utils.FileUtils
.getProcessExecDir(processInstance.getProcessDefinition().getProjectId(),
processInstance.getProcessDefinitionId(),
processInstance.getId());
try {
FileUtils.deleteDirectory(new File(execLocalPath));
} catch (IOException e) {
logger.error("delete exec dir failed ", e);
}
}
}
/** /**
* submit task to execute * submit task to execute
* @param taskInstance task instance * @param taskInstance task instance

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
@ -40,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -146,8 +148,38 @@ public class TaskExecuteThread implements Runnable {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT); ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
clearTaskExecPath();
}
}
/**
* when task finish, clear execute path.
*/
private void clearTaskExecPath() {
logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
if (!CommonUtils.isDevelopMode()) {
// get exec dir
String execLocalPath = taskExecutionContext.getExecutePath();
if (StringUtils.isEmpty(execLocalPath)) {
logger.warn("task: {} exec local path is empty.", taskExecutionContext.getTaskName());
return;
} }
if ("/".equals(execLocalPath)) {
logger.warn("task: {} exec local path is '/', direct deletion is not allowed", taskExecutionContext.getTaskName());
return;
}
try {
org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
logger.info("exec local path: {} cleared.", execLocalPath);
} catch (IOException e) {
logger.error("delete exec dir failed : " + e.getMessage(), e);
}
}
} }
/** /**

Loading…
Cancel
Save