Browse Source

[Fix-6529] Improve the python task to solve the issue about getting the environment variables. (#6531)

* modify config

* improve python task

* improve python task
3.0.0/version-upgrade
Hua Jiang 3 years ago committed by GitHub
parent
commit
8200bc1452
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 205
      dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
  2. 77
      dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

205
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java

@ -1,205 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.python;
import java.util.Arrays;
import org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.io.FileUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* python command executor
*/
public class PythonCommandExecutor extends AbstractCommandExecutor {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutor.class);
/**
* python
*/
public static final String PYTHON = "python";
private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$");
/**
* constructor
*
* @param logHandler log handler
* @param taskRequest TaskRequest
* @param logger logger
*/
public PythonCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskRequest taskRequest,
Logger logger) {
super(logHandler, taskRequest, logger);
}
/**
* build command file path
*
* @return command file path
*/
@Override
protected String buildCommandFilePath() {
return String.format("%s/py_%s.command", taskRequest.getExecutePath(), taskRequest.getTaskAppId());
}
/**
* create command file if not exists
*
* @param execCommand exec command
* @param commandFile command file
* @throws IOException io exception
*/
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
logger.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(), taskRequest.getExecutePath());
if (!Files.exists(Paths.get(commandFile))) {
logger.info("generate command file:{}", commandFile);
StringBuilder sb = new StringBuilder();
sb.append("#-*- encoding=utf8 -*-\n");
sb.append("\n\n");
sb.append(execCommand);
logger.info(sb.toString());
// write data to file
FileUtils.writeStringToFile(new File(commandFile),
sb.toString(),
StandardCharsets.UTF_8);
}
}
/**
* get the absolute path of the Python command
* note :
* common.properties
* PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself
* <p>
* for example :
* your PYTHON_HOM is /opt/python3.7/
* you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties
* dolphinscheduler.env.path file.
*
* @param envPath env path
* @return python home
*/
private static String getPythonHome(String envPath) {
// BufferedReader br = null;
StringBuilder sb = new StringBuilder();
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));) {
String line;
while ((line = br.readLine()) != null) {
if (line.contains(PythonConstants.PYTHON_HOME)) {
sb.append(line);
break;
}
}
String result = sb.toString();
if (StringUtils.isEmpty(result)) {
return null;
}
String[] arrs = result.split(PythonConstants.EQUAL_SIGN);
if (arrs.length == 2) {
return arrs[1];
}
} catch (IOException e) {
logger.error("read file failure", e);
}
return null;
}
/**
* Gets the command path to which Python can execute
* @return python command path
*/
@Override
protected String commandInterpreter() {
String pythonHome = getPythonHome(taskRequest.getEnvFile());
if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
pythonHome = getPythonHomeFromEnvironmentConfig(taskRequest.getEnvironmentConfig());
}
return getPythonCommand(pythonHome);
}
/**
* get python command
*
* @param pythonHome python home
* @return python command
*/
public static String getPythonCommand(String pythonHome) {
if (StringUtils.isEmpty(pythonHome)) {
return PYTHON;
}
File file = new File(pythonHome);
if (file.exists() && file.isFile()) {
return pythonHome;
}
if (PYTHON_PATH_PATTERN.matcher(pythonHome).find()) {
return pythonHome;
}
return Paths.get(pythonHome, "/bin/python").toString();
}
/**
* get python home from the environment config
*
* @param environmentConfig env config
* @return python home
*/
public static String getPythonHomeFromEnvironmentConfig(String environmentConfig) {
String[] lines = environmentConfig.split("\n");
String pythonHomeConfig = Arrays.stream(lines).filter(line -> line.contains(PythonConstants.PYTHON_HOME)).findFirst().get();
if (StringUtils.isEmpty(pythonHomeConfig)) {
return null;
}
String[] arrs = pythonHomeConfig.split(PythonConstants.EQUAL_SIGN);
if (arrs.length == 2) {
return arrs[1];
}
return null;
}
}

77
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.python;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
@ -29,6 +30,13 @@ import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
@ -43,14 +51,9 @@ public class PythonTask extends AbstractTaskExecutor {
private PythonParameters pythonParameters;
/**
* task dir
*/
private String taskDir;
/**
* python command executor
* shell command executor
*/
private PythonCommandExecutor pythonCommandExecutor;
private ShellCommandExecutor shellCommandExecutor;
private TaskRequest taskRequest;
@ -63,7 +66,7 @@ public class PythonTask extends AbstractTaskExecutor {
super(taskRequest);
this.taskRequest = taskRequest;
this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest,
logger);
}
@ -93,13 +96,20 @@ public class PythonTask extends AbstractTaskExecutor {
@Override
public void handle() throws Exception {
try {
// construct process
String command = buildCommand();
TaskResponse taskResponse = pythonCommandExecutor.run(command);
// generate the content of this python script
String pythonScriptContent = buildPythonScriptContent();
// generate the file path of this python script
String pythonScriptFile = buildPythonCommandFilePath();
// create this file
createPythonCommandFileIfNotExists(pythonScriptContent,pythonScriptFile);
String command = "python " + pythonScriptFile;
TaskResponse taskResponse = shellCommandExecutor.run(command);
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
setVarPool(pythonCommandExecutor.getVarPool());
setVarPool(shellCommandExecutor.getVarPool());
} catch (Exception e) {
logger.error("python task failure", e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
@ -110,7 +120,7 @@ public class PythonTask extends AbstractTaskExecutor {
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
pythonCommandExecutor.cancelApplication();
shellCommandExecutor.cancelApplication();
}
@Override
@ -151,12 +161,48 @@ public class PythonTask extends AbstractTaskExecutor {
}
/**
* build command
* create python command file if not exists
*
* @param pythonScript exec python script
* @param pythonScriptFile python script file
* @throws IOException io exception
*/
protected void createPythonCommandFileIfNotExists(String pythonScript, String pythonScriptFile) throws IOException {
logger.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(), taskRequest.getExecutePath());
if (!Files.exists(Paths.get(pythonScriptFile))) {
logger.info("generate python script file:{}", pythonScriptFile);
StringBuilder sb = new StringBuilder();
sb.append("#-*- encoding=utf8 -*-\n");
sb.append("\n\n");
sb.append(pythonScript);
logger.info(sb.toString());
// write data to file
FileUtils.writeStringToFile(new File(pythonScriptFile),
sb.toString(),
StandardCharsets.UTF_8);
}
}
/**
* build python command file path
*
* @return python command file path
*/
protected String buildPythonCommandFilePath() {
return String.format("%s/py_%s.py", taskRequest.getExecutePath(), taskRequest.getTaskAppId());
}
/**
* build python script content
*
* @return raw python script
* @throws Exception exception
*/
private String buildCommand() throws Exception {
private String buildPythonScriptContent() throws Exception {
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
// replace placeholder
@ -170,7 +216,6 @@ public class PythonTask extends AbstractTaskExecutor {
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
logger.info("raw python script : {}", pythonParameters.getRawScript());
logger.info("task dir : {}", taskDir);
return rawPythonScript;
}

Loading…
Cancel
Save