From 8200bc14526364fa1908b1df82befc984b0419aa Mon Sep 17 00:00:00 2001 From: Hua Jiang Date: Mon, 18 Oct 2021 17:33:16 +0800 Subject: [PATCH] [Fix-6529] Improve the python task to solve the issue about getting the environment variables. (#6531) * modify config * improve python task * improve python task --- .../task/python/PythonCommandExecutor.java | 205 ------------------ .../plugin/task/python/PythonTask.java | 77 +++++-- 2 files changed, 61 insertions(+), 221 deletions(-) delete mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java deleted file mode 100644 index d40f61b829..0000000000 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.task.python; - -import java.util.Arrays; -import org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor; -import org.apache.dolphinscheduler.spi.task.request.TaskRequest; -import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import org.apache.commons.io.FileUtils; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Consumer; -import java.util.regex.Pattern; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * python command executor - */ -public class PythonCommandExecutor extends AbstractCommandExecutor { - - /** - * logger - */ - private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutor.class); - - /** - * python - */ - public static final String PYTHON = "python"; - - private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$"); - - /** - * constructor - * - * @param logHandler log handler - * @param taskRequest TaskRequest - * @param logger logger - */ - public PythonCommandExecutor(Consumer> logHandler, - TaskRequest taskRequest, - Logger logger) { - super(logHandler, taskRequest, logger); - } - - - /** - * build command file path - * - * @return command file path - */ - @Override - protected String buildCommandFilePath() { - return String.format("%s/py_%s.command", taskRequest.getExecutePath(), taskRequest.getTaskAppId()); - } - - /** - * create command file if not exists - * - * @param execCommand exec command - * @param commandFile command file - * @throws IOException io exception - */ - @Override - protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { - logger.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(), taskRequest.getExecutePath()); - - if (!Files.exists(Paths.get(commandFile))) { - logger.info("generate command file:{}", commandFile); - - StringBuilder sb = new StringBuilder(); - sb.append("#-*- encoding=utf8 -*-\n"); - - sb.append("\n\n"); - sb.append(execCommand); - logger.info(sb.toString()); - - // write data to file - FileUtils.writeStringToFile(new File(commandFile), - sb.toString(), - StandardCharsets.UTF_8); - } - } - - /** - * get the absolute path of the Python command - * note : - * common.properties - * PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself - *

- * for example : - * your PYTHON_HOM is /opt/python3.7/ - * you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties - * dolphinscheduler.env.path file. - * - * @param envPath env path - * @return python home - */ - private static String getPythonHome(String envPath) { - // BufferedReader br = null; - StringBuilder sb = new StringBuilder(); - try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));) { - String line; - while ((line = br.readLine()) != null) { - if (line.contains(PythonConstants.PYTHON_HOME)) { - sb.append(line); - break; - } - } - String result = sb.toString(); - if (StringUtils.isEmpty(result)) { - return null; - } - String[] arrs = result.split(PythonConstants.EQUAL_SIGN); - if (arrs.length == 2) { - return arrs[1]; - } - } catch (IOException e) { - logger.error("read file failure", e); - } - return null; - } - - /** - * Gets the command path to which Python can execute - * @return python command path - */ - @Override - protected String commandInterpreter() { - String pythonHome = getPythonHome(taskRequest.getEnvFile()); - - if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) { - pythonHome = getPythonHomeFromEnvironmentConfig(taskRequest.getEnvironmentConfig()); - } - - return getPythonCommand(pythonHome); - } - - /** - * get python command - * - * @param pythonHome python home - * @return python command - */ - public static String getPythonCommand(String pythonHome) { - if (StringUtils.isEmpty(pythonHome)) { - return PYTHON; - } - File file = new File(pythonHome); - if (file.exists() && file.isFile()) { - return pythonHome; - } - if (PYTHON_PATH_PATTERN.matcher(pythonHome).find()) { - return pythonHome; - } - return Paths.get(pythonHome, "/bin/python").toString(); - } - - /** - * get python home from the environment config - * - * @param environmentConfig env config - * @return python home - */ - public static String getPythonHomeFromEnvironmentConfig(String environmentConfig) { - String[] lines = environmentConfig.split("\n"); - - String pythonHomeConfig = Arrays.stream(lines).filter(line -> line.contains(PythonConstants.PYTHON_HOME)).findFirst().get(); - - if (StringUtils.isEmpty(pythonHomeConfig)) { - return null; - } - String[] arrs = pythonHomeConfig.split(PythonConstants.EQUAL_SIGN); - if (arrs.length == 2) { - return arrs[1]; - } - return null; - } -} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java index 01457da642..382f1778b9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.plugin.task.python; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; +import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskResponse; import org.apache.dolphinscheduler.plugin.task.util.MapUtils; @@ -29,6 +30,13 @@ import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; @@ -43,14 +51,9 @@ public class PythonTask extends AbstractTaskExecutor { private PythonParameters pythonParameters; /** - * task dir - */ - private String taskDir; - - /** - * python command executor + * shell command executor */ - private PythonCommandExecutor pythonCommandExecutor; + private ShellCommandExecutor shellCommandExecutor; private TaskRequest taskRequest; @@ -63,7 +66,7 @@ public class PythonTask extends AbstractTaskExecutor { super(taskRequest); this.taskRequest = taskRequest; - this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle, + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest, logger); } @@ -93,13 +96,20 @@ public class PythonTask extends AbstractTaskExecutor { @Override public void handle() throws Exception { try { - // construct process - String command = buildCommand(); - TaskResponse taskResponse = pythonCommandExecutor.run(command); + // generate the content of this python script + String pythonScriptContent = buildPythonScriptContent(); + // generate the file path of this python script + String pythonScriptFile = buildPythonCommandFilePath(); + + // create this file + createPythonCommandFileIfNotExists(pythonScriptContent,pythonScriptFile); + String command = "python " + pythonScriptFile; + + TaskResponse taskResponse = shellCommandExecutor.run(command); setExitStatusCode(taskResponse.getExitStatusCode()); setAppIds(taskResponse.getAppIds()); setProcessId(taskResponse.getProcessId()); - setVarPool(pythonCommandExecutor.getVarPool()); + setVarPool(shellCommandExecutor.getVarPool()); } catch (Exception e) { logger.error("python task failure", e); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); @@ -110,7 +120,7 @@ public class PythonTask extends AbstractTaskExecutor { @Override public void cancelApplication(boolean cancelApplication) throws Exception { // cancel process - pythonCommandExecutor.cancelApplication(); + shellCommandExecutor.cancelApplication(); } @Override @@ -151,12 +161,48 @@ public class PythonTask extends AbstractTaskExecutor { } /** - * build command + * create python command file if not exists + * + * @param pythonScript exec python script + * @param pythonScriptFile python script file + * @throws IOException io exception + */ + protected void createPythonCommandFileIfNotExists(String pythonScript, String pythonScriptFile) throws IOException { + logger.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(), taskRequest.getExecutePath()); + + if (!Files.exists(Paths.get(pythonScriptFile))) { + logger.info("generate python script file:{}", pythonScriptFile); + + StringBuilder sb = new StringBuilder(); + sb.append("#-*- encoding=utf8 -*-\n"); + + sb.append("\n\n"); + sb.append(pythonScript); + logger.info(sb.toString()); + + // write data to file + FileUtils.writeStringToFile(new File(pythonScriptFile), + sb.toString(), + StandardCharsets.UTF_8); + } + } + + /** + * build python command file path + * + * @return python command file path + */ + protected String buildPythonCommandFilePath() { + return String.format("%s/py_%s.py", taskRequest.getExecutePath(), taskRequest.getTaskAppId()); + } + + /** + * build python script content * * @return raw python script * @throws Exception exception */ - private String buildCommand() throws Exception { + private String buildPythonScriptContent() throws Exception { String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); // replace placeholder @@ -170,7 +216,6 @@ public class PythonTask extends AbstractTaskExecutor { rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); logger.info("raw python script : {}", pythonParameters.getRawScript()); - logger.info("task dir : {}", taskDir); return rawPythonScript; }