Browse Source

Clearer task runnable. (#13689)

* Clean unused method in AbstractTask

* Kill task by process.destroy

* wait 5/s after destroy process
3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
aef5524ee7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 63
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  2. 18
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
  3. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java
  4. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java
  5. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
  6. 16
      dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
  7. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
  8. 43
      dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
  9. 1
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

63
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.api;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.getPidsStr;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils;
@ -191,7 +190,7 @@ public abstract class AbstractCommandExecutor {
command.add(String.format("--uid=%s", taskRequest.getTenantCode())); command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
} }
public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws IOException, InterruptedException { public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws Exception {
TaskResponse result = new TaskResponse(); TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId(); int taskInstanceId = taskRequest.getTaskInstanceId();
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
@ -223,8 +222,8 @@ public abstract class AbstractCommandExecutor {
boolean updateTaskExecutionContextStatus = boolean updateTaskExecutionContextStatus =
TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest); TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest);
if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) { if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
ProcessUtils.kill(taskRequest);
result.setExitStatusCode(EXIT_CODE_KILL); result.setExitStatusCode(EXIT_CODE_KILL);
cancelApplication();
return result; return result;
} }
// print process id // print process id
@ -262,14 +261,13 @@ public abstract class AbstractCommandExecutor {
} else { } else {
logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
taskRequest.getTaskTimeout()); taskRequest.getTaskTimeout());
ProcessUtils.kill(taskRequest);
result.setExitStatusCode(EXIT_CODE_FAILURE); result.setExitStatusCode(EXIT_CODE_FAILURE);
cancelApplication();
} }
int exitCode = process.exitValue(); int exitCode = process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited."; String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
logger.info(exitLogMessage logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
+ " execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
return result; return result;
} }
@ -278,53 +276,18 @@ public abstract class AbstractCommandExecutor {
return varPool.toString(); return varPool.toString();
} }
/** public void cancelApplication() throws InterruptedException {
* cancel application
*
* @throws Exception exception
*/
public void cancelApplication() throws Exception {
if (process == null) { if (process == null) {
return; return;
} }
int processId = getProcessId(process); // soft kill
logger.info("Begin to kill process process, pid is : {}", processId); logger.info("Begin to kill process process, pid is : {}", taskRequest.getProcessId());
// kill , waiting for completion process.destroy();
boolean alive = softKill(processId); if (!process.waitFor(5, TimeUnit.SECONDS)) {
process.destroyForcibly();
if (alive) {
String cmd = String.format("kill -9 %s", getPidsStr(processId));
cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd);
OSUtils.exeCmd(cmd);
logger.info("Success kill task: {}, pid: {}, cmd: {}", taskRequest.getTaskAppId(), processId, cmd);
} else {
logger.info("The process: {} is not alive, no need to kill", processId);
} }
} logger.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(), taskRequest.getProcessId());
/**
* soft kill
*
* @param processId process id
* @return process is alive
*/
private boolean softKill(int processId) {
if (processId != 0 && process.isAlive()) {
try {
// sudo -u user command to run command
String cmd = String.format("kill %d", processId);
cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd);
logger.info("soft kill task:{}, process id:{}, cmd:{}", taskRequest.getTaskAppId(), processId, cmd);
Runtime.getRuntime().exec(cmd);
} catch (IOException e) {
logger.info("kill attempt failed", e);
}
}
return process.isAlive();
} }
private void printCommand(List<String> commands) { private void printCommand(List<String> commands) {
@ -424,7 +387,7 @@ public abstract class AbstractCommandExecutor {
f.setAccessible(true); f.setAccessible(true);
processId = f.getInt(process); processId = f.getInt(process);
} catch (Throwable e) { } catch (Exception e) {
logger.error("Get task pid failed", e); logger.error("Get task pid failed", e);
} }

18
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java

@ -55,11 +55,6 @@ public abstract class AbstractTask {
*/ */
protected int processId; protected int processId;
/**
* SHELL result string
*/
protected String resultString;
/** /**
* other resource manager appId , for example : YARN etc * other resource manager appId , for example : YARN etc
*/ */
@ -89,10 +84,7 @@ public abstract class AbstractTask {
public void init() { public void init() {
} }
public String getPreScript() { // todo: return TaskResult rather than store the result in Task
return null;
}
public abstract void handle(TaskCallBack taskCallBack) throws TaskException; public abstract void handle(TaskCallBack taskCallBack) throws TaskException;
public abstract void cancel() throws TaskException; public abstract void cancel() throws TaskException;
@ -126,14 +118,6 @@ public abstract class AbstractTask {
this.processId = processId; this.processId = processId;
} }
public String getResultString() {
return resultString;
}
public void setResultString(String resultString) {
this.resultString = resultString;
}
public String getAppIds() { public String getAppIds() {
return appIds; return appIds;
} }

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java

@ -25,8 +25,10 @@ public interface TaskChannel {
void cancelApplication(boolean status); void cancelApplication(boolean status);
// todo: return ITask
AbstractTask createTask(TaskExecutionContext taskRequest); AbstractTask createTask(TaskExecutionContext taskRequest);
// todo: return IParameters
AbstractParameters parseParameters(ParametersNode parametersNode); AbstractParameters parseParameters(ParametersNode parametersNode);
ResourceParametersHelper getResources(String parameters); ResourceParametersHelper getResources(String parameters);

3
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java

@ -1,4 +1,5 @@
package org.apache.dolphinscheduler.plugin.task.api.parameters;/* package org.apache.dolphinscheduler.plugin.task.api.parameters;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.

1
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java

@ -79,6 +79,7 @@ public final class ProcessUtils {
/** /**
* kill tasks according to different task types. * kill tasks according to different task types.
*/ */
@Deprecated
public static boolean kill(@NonNull TaskExecutionContext request) { public static boolean kill(@NonNull TaskExecutionContext request) {
try { try {
log.info("Begin kill task instance, processId: {}", request.getProcessId()); log.info("Begin kill task instance, processId: {}", request.getProcessId());

16
dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java

@ -99,22 +99,6 @@ public class JavaTask extends AbstractTask {
log.info("Initialize java task params {}", JSONUtils.toPrettyJsonString(javaParameters)); log.info("Initialize java task params {}", JSONUtils.toPrettyJsonString(javaParameters));
} }
/**
* Gets the Java source file that was initially processed
*
* @return String
**/
@Override
public String getPreScript() {
String rawJavaScript = javaParameters.getRawScript().replaceAll("\\r\\n", "\n");
try {
rawJavaScript = convertJavaSourceCodePlaceholders(rawJavaScript);
} catch (StringIndexOutOfBoundsException e) {
log.error("setShareVar field format error, raw java script: {}", rawJavaScript);
}
return rawJavaScript;
}
/** /**
* Execute Java tasks * Execute Java tasks
* *

6
dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java

@ -70,12 +70,6 @@ public class OpenmldbTask extends PythonTask {
} }
} }
@Override
@Deprecated
public String getPreScript() {
return "";
}
/** /**
* build python command file path * build python command file path
* *

43
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

@ -87,17 +87,6 @@ public class PythonTask extends AbstractTask {
} }
} }
@Override
public String getPreScript() {
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
try {
rawPythonScript = convertPythonScriptPlaceholders(rawPythonScript);
} catch (StringIndexOutOfBoundsException e) {
log.error("setShareVar field format error, raw python script : {}", rawPythonScript);
}
return rawPythonScript;
}
@Override @Override
public void handle(TaskCallBack taskCallBack) throws TaskException { public void handle(TaskCallBack taskCallBack) throws TaskException {
try { try {
@ -137,38 +126,6 @@ public class PythonTask extends AbstractTask {
return pythonParameters; return pythonParameters;
} }
/**
* convertPythonScriptPlaceholders
*
* @param rawScript rawScript
* @return String
* @throws StringIndexOutOfBoundsException if substring index is out of bounds
*/
private static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException {
int len = "${setShareVar(${".length();
int scriptStart = 0;
while ((scriptStart = rawScript.indexOf("${setShareVar(${", scriptStart)) != -1) {
int start = -1;
int end = rawScript.indexOf('}', scriptStart + len);
String prop = rawScript.substring(scriptStart + len, end);
start = rawScript.indexOf(',', end);
end = rawScript.indexOf(')', start);
String value = rawScript.substring(start + 1, end);
start = rawScript.indexOf('}', start) + 1;
end = rawScript.length();
String replaceScript = String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value);
rawScript = rawScript.substring(0, scriptStart) + replaceScript + rawScript.substring(start, end);
scriptStart += replaceScript.length();
}
return rawScript;
}
/** /**
* create python command file if not exists * create python command file if not exists
* *

1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -186,6 +186,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param processId * @param processId
*/ */
protected boolean killProcess(String tenantCode, Integer processId) { protected boolean killProcess(String tenantCode, Integer processId) {
// todo: directly interrupt the process
boolean processFlag = true; boolean processFlag = true;
if (processId == null || processId.equals(0)) { if (processId == null || processId.equals(0)) {
return true; return true;

Loading…
Cancel
Save