Browse Source

Use DATAX_PYTHON to specify a datax python version (#13849)

(cherry picked from commit 752452ecbd2f8094b819eae219f3eec0f6a6427b)
3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
a1861bc2df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docs/docs/en/guide/task/datax.md
  2. 2
      docs/docs/zh/guide/task/datax.md
  3. 28
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
  4. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java

2
docs/docs/en/guide/task/datax.md

@ -4,6 +4,8 @@
DataX task type for executing DataX programs. For DataX nodes, the worker will execute `${DATAX_HOME}/bin/datax.py` to analyze the input json file. DataX task type for executing DataX programs. For DataX nodes, the worker will execute `${DATAX_HOME}/bin/datax.py` to analyze the input json file.
By default, the datax.py will be executed by python2.7, if you want to use other python version, you can set the `DATAX_PYTHON` environment variable to specify a version.
## Create Task ## Create Task
- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page. - Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.

2
docs/docs/zh/guide/task/datax.md

@ -4,6 +4,8 @@
DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点,worker 会通过执行 `${DATAX_HOME}/bin/datax.py` 来解析传入的 json 文件。 DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点,worker 会通过执行 `${DATAX_HOME}/bin/datax.py` 来解析传入的 json 文件。
默认会使用python2.7去执行datax.py,如果需要使用其他版本的python去执行datax.py,需要在环境变量中配置`DATAX_PYTHON`。
## 创建任务 ## 创建任务
- 点击项目管理 -> 项目名称 -> 工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面; - 点击项目管理 -> 项目名称 -> 工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;

28
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -40,14 +40,12 @@ import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.SystemUtils;
import java.io.File; import java.io.File;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
@ -60,10 +58,9 @@ import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.alibaba.druid.sql.ast.SQLStatement; import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
@ -87,9 +84,9 @@ public class DataxTask extends AbstractTask {
public static final String CUSTOM_PARAM = " -D%s='%s'"; public static final String CUSTOM_PARAM = " -D%s='%s'";
/** /**
* python process(datax only supports version 2.7 by default) * python process(datax only supports version 2.7 by default)
* todo: Create a shell script to execute the datax task, and read the python version from the env, so we can support multiple versions of datax python
*/ */
private static final String DATAX_PYTHON = "python2.7"; private static final String DATAX_PYTHON = Optional.ofNullable(System.getenv("DATAX_PYTHON")).orElse("python2.7");
private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$");
/** /**
* select all * select all
@ -399,7 +396,7 @@ public class DataxTask extends AbstractTask {
} }
// datax python command // datax python command
String sbr = getPythonCommand() + String sbr = DATAX_PYTHON +
" " + " " +
DATAX_PATH + DATAX_PATH +
" " + " " +
@ -441,23 +438,6 @@ public class DataxTask extends AbstractTask {
return customParameters; return customParameters;
} }
public String getPythonCommand() {
String pythonHome = System.getenv("PYTHON_HOME");
return getPythonCommand(pythonHome);
}
public String getPythonCommand(String pythonHome) {
if (StringUtils.isEmpty(pythonHome)) {
return DATAX_PYTHON;
}
String pythonBinPath = "/bin/" + DATAX_PYTHON;
Matcher matcher = PYTHON_PATH_PATTERN.matcher(pythonHome);
if (matcher.find()) {
return matcher.replaceAll(pythonBinPath);
}
return Paths.get(pythonHome, pythonBinPath).toString();
}
public String loadJvmEnv(DataxParameters dataXParameters) { public String loadJvmEnv(DataxParameters dataXParameters) {
int xms = Math.max(dataXParameters.getXms(), 1); int xms = Math.max(dataXParameters.getXms(), 1);
int xmx = Math.max(dataXParameters.getXmx(), 1); int xmx = Math.max(dataXParameters.getXmx(), 1);

10
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java

@ -245,16 +245,6 @@ public class DataxTaskTest {
} }
} }
@Test
public void testGetPythonCommand() {
Assertions.assertEquals(dataxTask.getPythonCommand(""), "python2.7");
Assertions.assertEquals(dataxTask.getPythonCommand("/bin/python"), "/bin/python2.7");
String pythonCommand = dataxTask.getPythonCommand("/opt/python");
pythonCommand = pythonCommand.replace(File.separator, "/");
Assertions.assertEquals(pythonCommand, "/opt/python/bin/python2.7");
}
@Test @Test
public void testLoadJvmEnv() { public void testLoadJvmEnv() {
DataxParameters dataXParameters = createDataxParameters(); DataxParameters dataXParameters = createDataxParameters();

Loading…
Cancel
Save