Browse Source

[1.3.7-prepare][Fix-5596][Python] Fix conflict between python_home and datax_home configuration in dolphinscheduler_env.sh #5612 (#5901)

Co-authored-by: Kirs <acm_master@163.com>
Shiwen Cheng 3 years ago committed by GitHub
parent
commit
c4a82a73eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 72
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
  2. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
  3. 80
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java
  4. 58
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutorTest.java
  5. 34
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
  6. 2
      pom.xml

72
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java

@ -14,24 +14,30 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.task; package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*; import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* python command executor * python command executor
@ -47,7 +53,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
* python * python
*/ */
public static final String PYTHON = "python"; public static final String PYTHON = "python";
private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$");
/** /**
* constructor * constructor
@ -110,64 +116,70 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
} }
/** /**
* get python home * Gets the command path to which Python can execute
* @return python home * @return python command path
*/ */
@Override @Override
protected String commandInterpreter() { protected String commandInterpreter() {
String pythonHome = getPythonHome(taskExecutionContext.getEnvFile()); String pythonHome = getPythonHome(taskExecutionContext.getEnvFile());
if (StringUtils.isEmpty(pythonHome)){ return getPythonCommand(pythonHome);
}
/**
* get python command
*
* @param pythonHome python home
* @return python command
*/
public static String getPythonCommand(String pythonHome) {
if (StringUtils.isEmpty(pythonHome)) {
return PYTHON; return PYTHON;
} }
File file = new File(pythonHome);
if (file.exists() && file.isFile()) {
return pythonHome; return pythonHome;
} }
if (PYTHON_PATH_PATTERN.matcher(pythonHome).find()) {
return pythonHome;
}
return Paths.get(pythonHome, "/bin/python").toString();
}
/** /**
* get the absolute path of the Python command * get python home
* note :
* common.properties
* PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself
*
* for example :
* your PYTHON_HOM is /opt/python3.7/
* you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties
* dolphinscheduler.env.path file.
* *
* @param envPath env path * @param envPath env path
* @return python home * @return python home
*/ */
private static String getPythonHome(String envPath){ public static String getPythonHome(String envPath) {
BufferedReader br = null; BufferedReader br = null;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
try { try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath))); br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));
String line; String line;
while ((line = br.readLine()) != null){ while ((line = br.readLine()) != null) {
if (line.contains(Constants.PYTHON_HOME)){ if (line.contains(Constants.PYTHON_HOME)) {
sb.append(line); sb.append(line);
break; break;
} }
} }
String result = sb.toString(); String result = sb.toString();
if (org.apache.commons.lang.StringUtils.isEmpty(result)){ if (StringUtils.isEmpty(result)) {
return null; return null;
} }
String[] arrs = result.split(Constants.EQUAL_SIGN); String[] arrs = result.split(Constants.EQUAL_SIGN);
if (arrs.length == 2){ if (arrs.length == 2) {
return arrs[1]; return arrs[1];
} }
} catch (IOException e) {
}catch (IOException e){ logger.error("read file failure", e);
logger.error("read file failure",e); } finally {
}finally {
try { try {
if (br != null){ if (br != null) {
br.close(); br.close();
} }
} catch (IOException e) { } catch (IOException e) {
logger.error(e.getMessage(),e); logger.error(e.getMessage(), e);
} }
} }
return null; return null;

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.task.datax;
package org.apache.dolphinscheduler.server.worker.task.datax;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
@ -44,6 +45,7 @@ import java.io.File;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
@ -58,6 +60,8 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -72,7 +76,6 @@ import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser; import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
/** /**
* DataX task * DataX task
*/ */
@ -82,6 +85,7 @@ public class DataxTask extends AbstractTask {
* python process(datax only supports version 2.7 by default) * python process(datax only supports version 2.7 by default)
*/ */
private static final String DATAX_PYTHON = "python2.7"; private static final String DATAX_PYTHON = "python2.7";
private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$");
/** /**
* datax path * datax path
@ -366,7 +370,7 @@ public class DataxTask extends AbstractTask {
// datax python command // datax python command
StringBuilder sbr = new StringBuilder(); StringBuilder sbr = new StringBuilder();
sbr.append(DATAX_PYTHON); sbr.append(getPythonCommand());
sbr.append(" "); sbr.append(" ");
sbr.append(DATAX_PATH); sbr.append(DATAX_PATH);
sbr.append(" "); sbr.append(" ");
@ -392,6 +396,23 @@ public class DataxTask extends AbstractTask {
return fileName; return fileName;
} }
public String getPythonCommand() {
String pythonHome = System.getenv("PYTHON_HOME");
return getPythonCommand(pythonHome);
}
public String getPythonCommand(String pythonHome) {
if (StringUtils.isEmpty(pythonHome)) {
return DATAX_PYTHON;
}
String pythonBinPath = "/bin/" + DATAX_PYTHON;
Matcher matcher = PYTHON_PATH_PATTERN.matcher(pythonHome);
if (matcher.find()) {
return matcher.replaceAll(pythonBinPath);
}
return Paths.get(pythonHome, pythonBinPath).toString();
}
/** /**
* parsing synchronized column names in SQL statements * parsing synchronized column names in SQL statements
* *

80
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java

@ -1,80 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker;
import org.apache.commons.lang.StringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
public class EnvFileTest {
private static final Logger logger = LoggerFactory.getLogger(EnvFileTest.class);
@Test
public void test() {
String path = System.getProperty("user.dir")+"/script/env/dolphinscheduler_env.sh";
String pythonHome = getPythonHome(path);
logger.info(pythonHome);
}
/**
* get python home
* @param path
* @return
*/
private static String getPythonHome(String path){
BufferedReader br = null;
String line = null;
StringBuilder sb = new StringBuilder();
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
while ((line = br.readLine()) != null){
if (line.contains("PYTHON_HOME")){
sb.append(line);
break;
}
}
String result = sb.toString();
if (StringUtils.isEmpty(result)){
return null;
}
String[] arrs = result.split("=");
if (arrs.length == 2){
return arrs[1];
}
}catch (IOException e){
logger.error("read file failed",e);
}finally {
try {
if (br != null){
br.close();
}
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
}
return null;
}
}

58
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutorTest.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PythonCommandExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutorTest.class);
@Test
public void testGetPythonHome() {
String path = System.getProperty("user.dir") + "/script/env/dolphinscheduler_env.sh";
if (path.contains("dolphinscheduler-server/")) {
path = path.replace("dolphinscheduler-server/", "");
}
String pythonHome = PythonCommandExecutor.getPythonHome(path);
logger.info(pythonHome);
Assert.assertNotNull(pythonHome);
}
@Test
public void testGetPythonCommand() {
String pythonCommand = PythonCommandExecutor.getPythonCommand(null);
Assert.assertEquals(PythonCommandExecutor.PYTHON, pythonCommand);
pythonCommand = PythonCommandExecutor.getPythonCommand("");
Assert.assertEquals(PythonCommandExecutor.PYTHON, pythonCommand);
pythonCommand = PythonCommandExecutor.getPythonCommand("/usr/bin/python");
Assert.assertEquals("/usr/bin/python", pythonCommand);
pythonCommand = PythonCommandExecutor.getPythonCommand("/usr/local/bin/python2");
Assert.assertEquals("/usr/local/bin/python2", pythonCommand);
pythonCommand = PythonCommandExecutor.getPythonCommand("/opt/python/bin/python3.8");
Assert.assertEquals("/opt/python/bin/python3.8", pythonCommand);
pythonCommand = PythonCommandExecutor.getPythonCommand("/opt/soft/python");
Assert.assertEquals("/opt/soft/python/bin/python", pythonCommand);
pythonCommand = PythonCommandExecutor.getPythonCommand("/opt/soft/python-3.8");
Assert.assertEquals("/opt/soft/python-3.8/bin/python", pythonCommand);
}
}

34
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java

@ -14,16 +14,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.task.datax;
package org.apache.dolphinscheduler.server.worker.task.datax;
import java.lang.reflect.Method; import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
@ -36,6 +31,13 @@ import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -46,7 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS; import com.alibaba.fastjson.JSONObject;
/** /**
* DataxTask Tester. * DataxTask Tester.
@ -393,4 +395,20 @@ public class DataxTaskTest {
} }
} }
@Test
public void testGetPythonCommand() {
String pythonCommand = dataxTask.getPythonCommand();
Assert.assertEquals("python2.7", pythonCommand);
pythonCommand = dataxTask.getPythonCommand("");
Assert.assertEquals("python2.7", pythonCommand);
pythonCommand = dataxTask.getPythonCommand("/usr/bin/python");
Assert.assertEquals("/usr/bin/python2.7", pythonCommand);
pythonCommand = dataxTask.getPythonCommand("/usr/local/bin/python2");
Assert.assertEquals("/usr/local/bin/python2.7", pythonCommand);
pythonCommand = dataxTask.getPythonCommand("/opt/python/bin/python3.8");
Assert.assertEquals("/opt/python/bin/python2.7", pythonCommand);
pythonCommand = dataxTask.getPythonCommand("/opt/soft/python");
Assert.assertEquals("/opt/soft/python/bin/python2.7", pythonCommand);
}
} }

2
pom.xml

@ -839,10 +839,10 @@
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include> <include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include> <include>**/server/worker/sql/SqlExecutorTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include> <include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/EnvFileTest.java</include> <include>**/server/worker/task/EnvFileTest.java</include>
<include>**/server/worker/task/http/HttpTaskTest.java</include> <include>**/server/worker/task/http/HttpTaskTest.java</include>
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include> <include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/service/process/ProcessServiceTest.java</include> <include>**/service/process/ProcessServiceTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include> <include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include> <include>**/service/zk/DefaultEnsembleProviderTest.java</include>

Loading…
Cancel
Save