Browse Source

[Fix-6166][task-plugin]Fix Python Task Execution Error (#6167)

* fix python task execution error

* delete AbstractTask#setCommand
2.0.7-release
Martin Huang 3 years ago committed by GitHub
parent
commit
0715be34d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java
  2. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  3. 36
      dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

3
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java

@ -79,9 +79,6 @@ public abstract class AbstractTask {
return null; return null;
} }
public void setCommand(String command) throws Exception {
}
/** /**
* task handle * task handle

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -127,7 +127,7 @@ public abstract class AbstractCommandExecutor {
command.add("sudo"); command.add("sudo");
command.add("-u"); command.add("-u");
command.add(taskRequest.getTenantCode()); command.add(taskRequest.getTenantCode());
command.add(SH); command.add(commandInterpreter());
command.addAll(Collections.emptyList()); command.addAll(Collections.emptyList());
command.add(commandFile); command.add(commandFile);

36
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

@ -21,9 +21,13 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskResponse;
import org.apache.dolphinscheduler.spi.task.AbstractParameters; import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.TaskConstants; import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.Map;
/** /**
* python task * python task
@ -48,8 +52,6 @@ public class PythonTask extends AbstractTaskExecutor {
private TaskRequest taskRequest; private TaskRequest taskRequest;
private String command;
/** /**
* constructor * constructor
* *
@ -86,17 +88,12 @@ public class PythonTask extends AbstractTaskExecutor {
return rawPythonScript; return rawPythonScript;
} }
@Override
public void setCommand(String command) {
this.command = command;
}
@Override @Override
public void handle() throws Exception { public void handle() throws Exception {
try { try {
// construct process // construct process
String command = buildCommand();
TaskResponse taskResponse = pythonCommandExecutor.run(command); TaskResponse taskResponse = pythonCommandExecutor.run(command);
setExitStatusCode(taskResponse.getExitStatusCode()); setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds()); setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId()); setProcessId(taskResponse.getProcessId());
@ -151,4 +148,25 @@ public class PythonTask extends AbstractTaskExecutor {
return rawScript; return rawScript;
} }
/**
* build command
* @return raw python script
* @throws Exception exception
*/
private String buildCommand() throws Exception {
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskRequest,pythonParameters);
if (paramsMap != null){
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
}
logger.info("raw python script : {}", pythonParameters.getRawScript());
logger.info("task dir : {}", taskDir);
return rawPythonScript;
}
} }

Loading…
Cancel
Save