diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java index 2ea57106ad..2aa80f1ace 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java @@ -14,24 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; -import java.util.Date; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * python command executor @@ -47,7 +53,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { * python */ public static final String PYTHON = "python"; - + private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$"); /** * constructor @@ -110,64 +116,70 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { } /** - * get python home - * @return python home + * Gets the command path to which Python can execute + * @return python command path */ @Override protected String commandInterpreter() { String pythonHome = getPythonHome(taskExecutionContext.getEnvFile()); - if (StringUtils.isEmpty(pythonHome)){ + return getPythonCommand(pythonHome); + } + + /** + * get python command + * + * @param pythonHome python home + * @return python command + */ + public static String getPythonCommand(String pythonHome) { + if (StringUtils.isEmpty(pythonHome)) { return PYTHON; } - return pythonHome; + File file = new File(pythonHome); + if (file.exists() && file.isFile()) { + return pythonHome; + } + if (PYTHON_PATH_PATTERN.matcher(pythonHome).find()) { + return pythonHome; + } + return Paths.get(pythonHome, "/bin/python").toString(); } - - /** - * get the absolute path of the Python command - * note : - * common.properties - * PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself - * - * for example : - * your PYTHON_HOM is /opt/python3.7/ - * you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties - * dolphinscheduler.env.path file. + * get python home * * @param envPath env path * @return python home */ - private static String getPythonHome(String envPath){ + public static String getPythonHome(String envPath) { BufferedReader br = null; StringBuilder sb = new StringBuilder(); try { br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath))); String line; - while ((line = br.readLine()) != null){ - if (line.contains(Constants.PYTHON_HOME)){ + while ((line = br.readLine()) != null) { + if (line.contains(Constants.PYTHON_HOME)) { sb.append(line); break; } } String result = sb.toString(); - if (org.apache.commons.lang.StringUtils.isEmpty(result)){ + if (StringUtils.isEmpty(result)) { return null; } String[] arrs = result.split(Constants.EQUAL_SIGN); - if (arrs.length == 2){ + if (arrs.length == 2) { return arrs[1]; } - - }catch (IOException e){ - logger.error("read file failure",e); - }finally { + } catch (IOException e) { + logger.error("read file failure", e); + } finally { try { - if (br != null){ + if (br != null) { br.close(); } } catch (IOException e) { - logger.error(e.getMessage(),e); + logger.error(e.getMessage(), e); } } return null; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index fa354dceb3..afb9115b37 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.task.datax; +package org.apache.dolphinscheduler.server.worker.task.datax; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; @@ -44,6 +45,7 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; @@ -58,6 +60,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.slf4j.Logger; @@ -72,7 +76,6 @@ import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; import com.alibaba.druid.sql.parser.SQLStatementParser; import com.alibaba.fastjson.JSONObject; - /** * DataX task */ @@ -82,6 +85,7 @@ public class DataxTask extends AbstractTask { * python process(datax only supports version 2.7 by default) */ private static final String DATAX_PYTHON = "python2.7"; + private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$"); /** * datax path @@ -366,7 +370,7 @@ public class DataxTask extends AbstractTask { // datax python command StringBuilder sbr = new StringBuilder(); - sbr.append(DATAX_PYTHON); + sbr.append(getPythonCommand()); sbr.append(" "); sbr.append(DATAX_PATH); sbr.append(" "); @@ -392,6 +396,23 @@ public class DataxTask extends AbstractTask { return fileName; } + public String getPythonCommand() { + String pythonHome = System.getenv("PYTHON_HOME"); + return getPythonCommand(pythonHome); + } + + public String getPythonCommand(String pythonHome) { + if (StringUtils.isEmpty(pythonHome)) { + return DATAX_PYTHON; + } + String pythonBinPath = "/bin/" + DATAX_PYTHON; + Matcher matcher = PYTHON_PATH_PATTERN.matcher(pythonHome); + if (matcher.find()) { + return matcher.replaceAll(pythonBinPath); + } + return Paths.get(pythonHome, pythonBinPath).toString(); + } + /** * parsing synchronized column names in SQL statements * diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java deleted file mode 100644 index 7ed1522600..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.server.worker; - -import org.apache.commons.lang.StringUtils; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; - - -public class EnvFileTest { - - private static final Logger logger = LoggerFactory.getLogger(EnvFileTest.class); - - @Test - public void test() { - String path = System.getProperty("user.dir")+"/script/env/dolphinscheduler_env.sh"; - String pythonHome = getPythonHome(path); - logger.info(pythonHome); - } - - /** - * get python home - * @param path - * @return - */ - private static String getPythonHome(String path){ - BufferedReader br = null; - String line = null; - StringBuilder sb = new StringBuilder(); - try { - br = new BufferedReader(new InputStreamReader(new FileInputStream(path))); - while ((line = br.readLine()) != null){ - if (line.contains("PYTHON_HOME")){ - sb.append(line); - break; - } - } - String result = sb.toString(); - if (StringUtils.isEmpty(result)){ - return null; - } - String[] arrs = result.split("="); - if (arrs.length == 2){ - return arrs[1]; - } - - }catch (IOException e){ - logger.error("read file failed",e); - }finally { - try { - if (br != null){ - br.close(); - } - } catch (IOException e) { - logger.error(e.getMessage(),e); - } - } - return null; - } -} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutorTest.java new file mode 100644 index 0000000000..f4e9080a68 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutorTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PythonCommandExecutorTest { + + private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutorTest.class); + + @Test + public void testGetPythonHome() { + String path = System.getProperty("user.dir") + "/script/env/dolphinscheduler_env.sh"; + if (path.contains("dolphinscheduler-server/")) { + path = path.replace("dolphinscheduler-server/", ""); + } + String pythonHome = PythonCommandExecutor.getPythonHome(path); + logger.info(pythonHome); + Assert.assertNotNull(pythonHome); + } + + @Test + public void testGetPythonCommand() { + String pythonCommand = PythonCommandExecutor.getPythonCommand(null); + Assert.assertEquals(PythonCommandExecutor.PYTHON, pythonCommand); + pythonCommand = PythonCommandExecutor.getPythonCommand(""); + Assert.assertEquals(PythonCommandExecutor.PYTHON, pythonCommand); + pythonCommand = PythonCommandExecutor.getPythonCommand("/usr/bin/python"); + Assert.assertEquals("/usr/bin/python", pythonCommand); + pythonCommand = PythonCommandExecutor.getPythonCommand("/usr/local/bin/python2"); + Assert.assertEquals("/usr/local/bin/python2", pythonCommand); + pythonCommand = PythonCommandExecutor.getPythonCommand("/opt/python/bin/python3.8"); + Assert.assertEquals("/opt/python/bin/python3.8", pythonCommand); + pythonCommand = PythonCommandExecutor.getPythonCommand("/opt/soft/python"); + Assert.assertEquals("/opt/soft/python/bin/python", pythonCommand); + pythonCommand = PythonCommandExecutor.getPythonCommand("/opt/soft/python-3.8"); + Assert.assertEquals("/opt/soft/python-3.8/bin/python", pythonCommand); + } + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java index 041f81d62b..9d250e38c4 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java @@ -14,16 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.task.datax; +package org.apache.dolphinscheduler.server.worker.task.datax; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.Map; +import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS; -import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; @@ -36,6 +31,13 @@ import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Map; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -46,7 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS; +import com.alibaba.fastjson.JSONObject; /** * DataxTask Tester. @@ -393,4 +395,20 @@ public class DataxTaskTest { } } + @Test + public void testGetPythonCommand() { + String pythonCommand = dataxTask.getPythonCommand(); + Assert.assertEquals("python2.7", pythonCommand); + pythonCommand = dataxTask.getPythonCommand(""); + Assert.assertEquals("python2.7", pythonCommand); + pythonCommand = dataxTask.getPythonCommand("/usr/bin/python"); + Assert.assertEquals("/usr/bin/python2.7", pythonCommand); + pythonCommand = dataxTask.getPythonCommand("/usr/local/bin/python2"); + Assert.assertEquals("/usr/local/bin/python2.7", pythonCommand); + pythonCommand = dataxTask.getPythonCommand("/opt/python/bin/python3.8"); + Assert.assertEquals("/opt/python/bin/python2.7", pythonCommand); + pythonCommand = dataxTask.getPythonCommand("/opt/soft/python"); + Assert.assertEquals("/opt/soft/python/bin/python2.7", pythonCommand); + } + } diff --git a/pom.xml b/pom.xml index 6c82db9f5d..7fb58c5537 100644 --- a/pom.xml +++ b/pom.xml @@ -839,10 +839,10 @@ **/server/worker/shell/ShellCommandExecutorTest.java **/server/worker/sql/SqlExecutorTest.java **/server/worker/task/spark/SparkTaskTest.java + **/server/worker/task/spark/SparkTaskTest.java **/server/worker/task/EnvFileTest.java **/server/worker/task/http/HttpTaskTest.java **/server/worker/task/sqoop/SqoopTaskTest.java - **/server/worker/EnvFileTest.java **/service/process/ProcessServiceTest.java **/service/quartz/cron/CronUtilsTest.java **/service/zk/DefaultEnsembleProviderTest.java