Browse Source

[Fix][Server] Fix clear task execute path is related to master (#5123)

pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
492b318bd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
  2. 4
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
  3. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  4. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  5. 45
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  6. 6
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java

23
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java

@ -121,27 +121,8 @@ public class FileUtils {
* @return directory of process execution * @return directory of process execution
*/ */
public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId, int taskInstanceId) { public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId, int taskInstanceId) {
String fileName = String.format("%s/exec/process/%s/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId), String fileName = String.format("%s/exec/process/%d/%d/%d/%d", DATA_BASEDIR,
Integer.toString(processDefineId), Integer.toString(processInstanceId), Integer.toString(taskInstanceId)); projectId, processDefineId, processInstanceId, taskInstanceId);
File file = new File(fileName);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
return fileName;
}
/**
* directory of process instances
*
* @param projectId project id
* @param processDefineId process definition id
* @param processInstanceId process instance id
* @return directory of process instances
*/
public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId) {
String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId),
Integer.toString(processDefineId), Integer.toString(processInstanceId));
File file = new File(fileName); File file = new File(fileName);
if (!file.getParentFile().exists()) { if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs(); file.getParentFile().mkdirs();

4
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java

@ -60,10 +60,8 @@ public class FileUtilsTest {
@Test @Test
public void testGetProcessExecDir() { public void testGetProcessExecDir() {
String dir = FileUtils.getProcessExecDir(1,2,3, 4); String dir = FileUtils.getProcessExecDir(1, 2, 3, 4);
Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3/4", dir); Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3/4", dir);
dir = FileUtils.getProcessExecDir(1,2,3);
Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3", dir);
} }
@Test @Test

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -42,7 +42,6 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
@ -61,16 +60,11 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -78,7 +72,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -236,8 +229,6 @@ public class MasterExecThread implements Runnable {
processService.updateProcessInstance(processInstance); processService.updateProcessInstance(processInstance);
} finally { } finally {
taskExecService.shutdown(); taskExecService.shutdown();
// post handle
postHandle();
} }
} }
@ -427,27 +418,6 @@ public class MasterExecThread implements Runnable {
} }
} }
/**
* process post handle
*/
private void postHandle() {
logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
if (!CommonUtils.isDevelopMode()) {
// get exec dir
String execLocalPath = org.apache.dolphinscheduler.common.utils.FileUtils
.getProcessExecDir(processInstance.getProcessDefinition().getProjectId(),
processInstance.getProcessDefinitionId(),
processInstance.getId());
try {
FileUtils.deleteDirectory(new File(execLocalPath));
} catch (IOException e) {
logger.error("delete exec dir failed ", e);
}
}
}
/** /**
* submit task to execute * submit task to execute
* *

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -141,7 +141,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
// local execute path // local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext); String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {} ", execLocalPath); logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath); taskExecutionContext.setExecutePath(execLocalPath);
FileUtils.taskLoggerThreadLocal.set(taskLogger); FileUtils.taskLoggerThreadLocal.set(taskLogger);

45
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@ -45,6 +46,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -71,17 +73,17 @@ public class TaskExecuteThread implements Runnable, Delayed {
private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class); private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
/** /**
* task instance * task instance
*/ */
private TaskExecutionContext taskExecutionContext; private TaskExecutionContext taskExecutionContext;
/** /**
* abstract task * abstract task
*/ */
private AbstractTask task; private AbstractTask task;
/** /**
* task callback service * task callback service
*/ */
private TaskCallbackService taskCallbackService; private TaskCallbackService taskCallbackService;
@ -185,9 +187,38 @@ public class TaskExecuteThread implements Runnable, Delayed {
responseCommand.setAppIds(task.getAppIds()); responseCommand.setAppIds(task.getAppIds());
} finally { } finally {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT); ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
clearTaskExecPath();
}
}
/**
* when task finish, clear execute path.
*/
private void clearTaskExecPath() {
logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
if (!CommonUtils.isDevelopMode()) {
// get exec dir
String execLocalPath = taskExecutionContext.getExecutePath();
if (StringUtils.isEmpty(execLocalPath)) {
logger.warn("task: {} exec local path is empty.", taskExecutionContext.getTaskName());
return;
}
if ("/".equals(execLocalPath)) {
logger.warn("task: {} exec local path is '/', direct deletion is not allowed", taskExecutionContext.getTaskName());
return;
}
try {
org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
logger.info("exec local path: {} cleared.", execLocalPath);
} catch (IOException e) {
logger.error("delete exec dir failed : {}", e.getMessage(), e);
}
} }
} }
@ -196,7 +227,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
* @return * @return
*/ */
private Map<String, String> getGlobalParamsMap() { private Map<String, String> getGlobalParamsMap() {
Map<String,String> globalParamsMap = new HashMap<>(16); Map<String, String> globalParamsMap = new HashMap<>(16);
// global params string // global params string
String globalParamsStr = taskExecutionContext.getGlobalParams(); String globalParamsStr = taskExecutionContext.getGlobalParams();
@ -241,7 +272,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
} }
/** /**
* kill task * kill task
*/ */
public void kill() { public void kill() {
if (task != null) { if (task != null) {
@ -261,7 +292,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
* @param logger * @param logger
*/ */
private void downloadResource(String execLocalPath, private void downloadResource(String execLocalPath,
Map<String,String> projectRes, Map<String, String> projectRes,
Logger logger) throws Exception { Logger logger) throws Exception {
if (MapUtils.isEmpty(projectRes)) { if (MapUtils.isEmpty(projectRes)) {
return; return;

6
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java

@ -102,16 +102,12 @@ public class MasterExecThreadTest {
processDefinition.setGlobalParamList(Collections.EMPTY_LIST); processDefinition.setGlobalParamList(Collections.EMPTY_LIST);
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
masterExecThread = PowerMockito.spy(new MasterExecThread( masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService, null, null, config));
processInstance
, processService
, null, null, config));
// prepareProcess init dag // prepareProcess init dag
Field dag = MasterExecThread.class.getDeclaredField("dag"); Field dag = MasterExecThread.class.getDeclaredField("dag");
dag.setAccessible(true); dag.setAccessible(true);
dag.set(masterExecThread, new DAG()); dag.set(masterExecThread, new DAG());
PowerMockito.doNothing().when(masterExecThread, "executeProcess"); PowerMockito.doNothing().when(masterExecThread, "executeProcess");
PowerMockito.doNothing().when(masterExecThread, "postHandle");
PowerMockito.doNothing().when(masterExecThread, "prepareProcess"); PowerMockito.doNothing().when(masterExecThread, "prepareProcess");
PowerMockito.doNothing().when(masterExecThread, "runProcess"); PowerMockito.doNothing().when(masterExecThread, "runProcess");
PowerMockito.doNothing().when(masterExecThread, "endProcess"); PowerMockito.doNothing().when(masterExecThread, "endProcess");

Loading…
Cancel
Save