Browse Source

[Feature] New task plugin Pytorch (#11498)

Co-authored-by: Eric Gao <ericgao.apache@gmail.com>
3.1.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
a974ba74ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      docs/configs/docsdev.js
  2. 118
      docs/docs/en/guide/task/pytorch.md
  3. 112
      docs/docs/zh/guide/task/pytorch.md
  4. BIN
      docs/img/tasks/demo/pytorch_en.png
  5. BIN
      docs/img/tasks/demo/pytorch_note_en.png
  6. BIN
      docs/img/tasks/icons/pytorch.png
  7. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
  8. 49
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/pom.xml
  9. 82
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java
  10. 77
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PythonEnvManager.java
  11. 96
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchParameters.java
  12. 132
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
  13. 49
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskChannel.java
  14. 45
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskChannelFactory.java
  15. 252
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
  16. 1
      dolphinscheduler-task-plugin/pom.xml
  17. BIN
      dolphinscheduler-ui/public/images/task-icons/pytorch.png
  18. BIN
      dolphinscheduler-ui/public/images/task-icons/pytorch_hover.png
  19. 13
      dolphinscheduler-ui/src/locales/en_US/project.ts
  20. 13
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  21. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
  22. 138
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-pytorch.ts
  23. 10
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  24. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
  25. 88
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts
  26. 8
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
  27. 5
      dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
  28. 6
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

8
docs/configs/docsdev.js

@ -189,6 +189,10 @@ export default {
title: 'ChunJun',
link: '/en-us/docs/dev/user_doc/guide/task/chunjun.html',
},
{
title: 'Pytorch',
link: '/en-us/docs/dev/user_doc/guide/task/pytorch.html',
},
],
},
{
@ -805,6 +809,10 @@ export default {
title: 'ChunJun',
link: '/zh-cn/docs/dev/user_doc/guide/task/chunjun.html',
},
{
title: 'Pytorch',
link: '/zh-cn/docs/dev/user_doc/guide/task/pytorch.html',
},
],
},
{

118
docs/docs/en/guide/task/pytorch.md

@ -0,0 +1,118 @@
# Pytorch Node (experimental)
## Overview
[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
`Pytorch` task plugin enables users to run `Pytorch` projects in DolphinScheduler more conveniently. In addition, it supports handy Python environment management.
`Pytorch task plugin` is more than `Python task plugin`, which allows users to **conveniently use existing Python environments or create new ones (using `Virtualenv` or `Conda`)**. **It supports running Python projects (native or Git projects)** instead of just Python scripts.
## Create Task
- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
- Drag <img src="../../../../img/tasks/icons/pytorch.png" width="15"/> from the toolbar to the canvas.
## Task Example
The task plugin picture is as follows
![pytorch](../../../../img/tasks/demo/pytorch_en.png)
First, introduce some general parameters of DolphinScheduler:
- **Node name**: The node name in a workflow definition is unique.
- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select
the `prohibition execution`.
- **Descriptive information**: Describe the function of the node.
- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high
to low, and tasks with the same priority will execute in a first-in first-out order.
- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected,
randomly select a worker machine for execution.
- **Environment Name**: Configure the environment name in which run the script.
- **Times of failed retry attempts**: The number of times the task failed to resubmit.
- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
- **Delayed execution time**: The time (unit minute) that a task delays in execution.
- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm
email will send and the task execution will fail.
- **Resource**: Refers to the list of resource files that need to be called in the script, and the files uploaded or created in Resource Center - File Management.
- **User-defined parameters**: It is a user-defined parameter of Shell, which will replace the content with `${variable}` in the script.
- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as
upstream of the current task.
Here are some specific parameters for the Pytorch plugin:
#### Run time parameters
- **Python Script** :Entry to the Python script file that you want to run.
- **Script Input Parameters** :Input parameters at run time.
The preceding two parameters are used to minimize the running of the configuration. Additional configuration parameters are provided as follows. When you choose to expand more configurations, you can configure more parameters
- **Project Path** :Set environment variable `PYTHONPATH` to load the Python package/project code at this path when running Python scripts。Support for local paths or Git URL.
- If it is a local path, it is used as environment variable `PYTHONPATH`.
- If the GIT URL (`git @ | https:// | http://` prefix), can download the project, and the path after downloading will be set as new `Project Path`, if need to run a folder under the project, you can add `#subdirectory` after GIT URL.
#### Python environment parameters
- **Create An Environment Or Not** :Whether to create a new Python environment to run the task.
*no*
- **Python Command Path** :Such as `/usr/bin/python`,The default value is ${PYTHON_HOME} in environment.
*yes*
- **Python Environment Manager Tool** :You can choose `virtualenv` or `conda`.
- if choose `virtualenv`,that may use `virtualenv` to create a new environment. Use command `virtualenv -p ${PYTHON_HOME} venv`.
- if choose `conda`, ,that may use `conda` to create a new environment,And you need to specify the Python version.
- **Requirement File** :The defualt is requirements.txt。
We can use relative paths of `Python Script` and `Requirement File` if we set `Project Path` which contains the python script or required requirement file.
#### Demo
Now if we want to run the mnist subproject under `https://github.com/pytorch/examples`.
We can run task like below:
![pytorch_note](../../../../img/tasks/demo/pytorch_note_en.png)
In addition, if the code is stored in the `Resource`, you can use the `Resource` parameter to download the code, and write the related parameters into the path of the corresponding resource.
## Environment configuration
The environment configuration mainly depends on the choice of runtime Python environment. You need to configure the corresponding environment variables in the `Security` - `Environment Manage`.
### Specifying a Python path
It is applicable to the Python environment where the project has been run on the worker, so you can directly configure the `Python Command Path` as the corresponding Python environment in the component. If you do not know the environment address, you can use `which python` to obtain it.
### Create a new environment using `Conda`
It applies to a new environment to run the project. You need to create an environment in `Security` - `Environment Manage`. You can change the environment to the actual environment by referring to the following.
```shell
# Add the directory for the conda command
export PATH=$HOME/anaconda3/bin:$PATH
```
### Create a new environment using `Virtualenv`
It applies to a new environment to run the project. You need to create an environment in `Security` - `Environment Manage`. You can change the environment to the actual environment by referring to the following.
```shell
# Add the directory for the virtualenv command
export PATH=/home/xxx/anaconda3/bin:$PATH
export PYTHON_HOME=/usr/local/bin/python3.7
```
## Other
This task plugin can also run XGBoost, LightGBM, SkLearn, TensorFlow, Keras and other projects. This task plugin is available as an upgrade to Python task plugin running machine learning tasks.
If necessary, we can call this task plugin `PythonML`,which can run machine learning projects easily in DolphinScheduler.

112
docs/docs/zh/guide/task/pytorch.md

@ -0,0 +1,112 @@
# Pytorch 节点(试验版)
## 综述
[Pytorch](https://pytorch.org) 是一个的主流Python机器学习库。
为了用户能够在DolphinScheduler中**更方便的运行Pytorch项目**,实现了Pytorch任务组件。主要提供**便捷的python环境管理**以及支持**运行python项目**。
与Python任务组件不同,该组件允许用户快速使用已有python环境或者创建新的python环境(使用virtualenv或者conda);支持运行Python项目(本地项目或者Git项目)而非只是python脚本。
## 创建任务
- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
- 拖动工具栏的 <img src="../../../../img/tasks/icons/pytorch.png" width="15"/> 任务节点到画板中。
## 任务样例
组件图示如下:
![pytorch](../../../../img/tasks/demo/pytorch_en.png)
### 首先介绍一些DS通用参数
- **节点名称** :设置任务的名称。一个工作流定义中的节点名称是唯一的。
- **运行标志** :标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
- **描述** :描述该节点的功能。
- **任务优先级** :worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
- **Worker 分组** :任务分配给 worker 组的机器执行,选择 Default,会随机选择一台 worker 机执行。
- **环境名称** :配置运行脚本的环境。
- **失败重试次数** :任务失败重新提交的次数。
- **失败重试间隔** :任务失败重新提交任务的时间间隔,以分钟为单位。
- **延迟执行时间** :任务延迟执行的时间,以分钟为单位。
- **资源**:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。
- **自定义参数**:是 SHELL 局部的用户自定义参数,会替换脚本中以 `${变量}` 的内容。
- **前置任务** :选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
### Pytorch组件独有的参数
#### 运行参数
- **python脚本** :需要运行的python脚本文件入口。
- **脚本启动参数** :运行时的输入参数。
以上为两个最小化配置运行的参数,另外提供其他的一些配置参数如下可选,当选择展开更多配置时,可以配置更多参数。
- **python项目地址** :设置`PYTHONPATH`环境变量,设置后运行python脚本时可以加载该地址下的python包/项目代码。支持本地路径或者Git url
- 若为本地路径,作为`PYTHONPATH`环境变量。
- 如果为Git URL (以`git@ | https:// | http:// `前缀),则会下载项目,并将下载后存放地址作为新的**python项目地址**,若需要运行子文件夹下的项目,可以添加 `#subdirectory` 来配置。
#### python环境参数
- **是否创建新环境** :是否创建新的python环境来运行该任务。
*否*
- **python命令路径** :如`/usr/bin/python`,默认为DS环境配置中的`${PYTHON_HOME}`。
*是*
- **python环境管理工具** :可以选择virtualenv或者conda。
- 若选择`virtualenv`,则会用`virtualenv`创建一个新环境,使用命令 `virtualenv -p ${PYTHON_HOME} venv` 创建
- 若选择`conda`, 则会使用`conda` 创建一个新环境,并需要指定创建的python版本。
- **依赖文件** :默认为 requirements.txt。
配置了`python项目地址`参数,那么`python脚本`和`依赖文件`参数允许输入相对路径
#### Demo
如现在需要运行 https://github.com/pytorch/examples 项目下的mnist的子项目。
可以设置
![pytorch_note](../../../../img/tasks/demo/pytorch_note_en.png)
另外如果代码存放在资源中心,则可以使用`资源`参数下载代码,并将相关参数写成对应资源的路径即可。
## 环境配置
环境配置主要取决于运行时python环境的选择,需要在`安全中心`-`环境管理`中配置对应需要的环境变量即可。
### 指定python路径
适用于worker上已经有运行该项目的python环境,那么可以直接在组件中配置`pyhton命令路径`为对应的python环境即可,如果不知道该环境地址,可以使用`which python`获取。
### 使用Conda创建新环境
适用于新建环境运行该项目,需要在`安全中心`-`环境管理`中创建环境, 参考如下添加修改为实际环境即可。
```shell
# conda命令对应的目录加入PATH中
export PATH=$HOME/anaconda3/bin:$PATH
```
### 使用virtualenv创建新环境
适用于新建环境运行该项目,需要在`安全中心`-`环境管理`中创建环境, 参考如下添加修改为实际环境即可。
```shell
# virtualenv命令对应的目录加入PATH中
export PATH=/home/lucky/anaconda3/bin:$PATH
export PYTHON_HOME=/usr/local/bin/python3.7
```
## 其他
本组件也可以运行xgboost, lightgbm, sklearn, tensorflow, keras 等项目。本组件可作为python组件运行机器学习任务的升级组件。
如果有需要,后续建议可以统一涵盖为PythonML组件,来运行机器学习项目。

BIN
docs/img/tasks/demo/pytorch_en.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

BIN
docs/img/tasks/demo/pytorch_note_en.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 101 KiB

BIN
docs/img/tasks/icons/pytorch.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.6 KiB

6
dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml

@ -195,6 +195,12 @@
<artifactId>dolphinscheduler-task-sagemaker</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-pytorch</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

49
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/pom.xml

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-task-pytorch</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

82
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.pytorch;
import static org.apache.dolphinscheduler.plugin.task.api.AbstractShell.ExitCodeException;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import java.io.File;
import java.nio.file.Paths;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Data;
@Data
public class GitProjectManager {
public static final String GIT_PATH_LOCAL = "GIT_PROJECT";
private static final Pattern GIT_CHECK_PATTERN = Pattern.compile("^(git@|https?://)");
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
private String path;
private String baseDir = ".";
public static boolean isGitPath(String path) {
return GIT_CHECK_PATTERN.matcher(path).find();
}
public void prepareProject() throws Exception {
String savePath = Paths.get(baseDir, GIT_PATH_LOCAL).toString();
logger.info("clone project {} to {}", path, savePath);
String[] command = {"sh", "-c", String.format("git clone %s %s", getGitUrl(), savePath)};
try {
OSUtils.exeShell(command);
} catch (ExitCodeException e) {
if (!new File(savePath).exists()) {
throw e;
}
}
logger.info("clone project done");
}
public String getGitUrl() {
String gitUrl;
if (path.contains("#")) {
gitUrl = path.split("#")[0];
} else {
gitUrl = path;
}
return gitUrl;
}
public String getGitLocalPath() {
String gitLocalPath;
if (path.contains("#")) {
gitLocalPath = Paths.get(GIT_PATH_LOCAL, path.split("#")[1]).toString();
} else {
gitLocalPath = GIT_PATH_LOCAL;
}
return Paths.get(baseDir, gitLocalPath).toString();
}
}

77
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PythonEnvManager.java

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.pytorch;
import lombok.Data;
@Data
public class PythonEnvManager {
public static final String ENV_TOOL_VENV = "virtualenv";
public static final String ENV_TOOL_CONDA = "conda";
private static final String PATTERN_ENVIRONMENT_PYTHON = "python[\\d\\.]*$";
private static final String PATTERN_ENVIRONMENT_REQUIREMENT = "\\.txt$";
private static final String CREATE_ENV_NAME = "./venv";
private static final String CONDA_SOURCE = "source activate %s";
private static final String CONDA_BUILD = "conda create -y python=%s -p %s";
private static final String VIRTUALENV_SOURCE = "source %s/bin/activate";
private static final String VIRTUALENV_BUILD = "virtualenv -p ${PYTHON_HOME} %s";
private static final String INSTALL_COMMAND = "python -m pip install -r %s";
private String pythonEnvTool = ENV_TOOL_VENV;
private String condaPythonVersion = "3.9";
public String getBuildEnvCommand(String requirementPath) {
String buildCommand = "";
String sourceCommand = getSourceEnvCommand(CREATE_ENV_NAME);
if (pythonEnvTool.equals(ENV_TOOL_VENV)) {
buildCommand = String.format(VIRTUALENV_BUILD, CREATE_ENV_NAME);
} else if (pythonEnvTool.equals(ENV_TOOL_CONDA)) {
buildCommand = String.format(CONDA_BUILD, condaPythonVersion, CREATE_ENV_NAME);
}
String installCommand = String.format(INSTALL_COMMAND, requirementPath);
return buildCommand + " && " + sourceCommand + " && " + installCommand;
}
private String getSourceEnvCommand(String envName) {
String command = "";
if (pythonEnvTool.equals(ENV_TOOL_VENV)) {
command = String.format(VIRTUALENV_SOURCE, envName);
} else if (pythonEnvTool.equals(ENV_TOOL_CONDA)) {
command = String.format(CONDA_SOURCE, envName);
}
return command;
}
public String getPythonCommand() {
return String.format("%s/bin/python", CREATE_ENV_NAME);
}
}

96
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchParameters.java

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.pytorch;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.nio.file.Paths;
import java.util.List;
import lombok.Data;
@Data
public class PytorchParameters extends AbstractParameters {
private Boolean isCreateEnvironment = false;
private String pythonPath = ".";
private String script;
private String scriptParams;
private String pythonCommand = "${PYTHON_HOME}";
private String pythonEnvTool = PythonEnvManager.ENV_TOOL_VENV;
private String requirements = "requirements.txt";
private String condaPythonVersion = "3.9";
/**
* resource list
*/
private List<ResourceInfo> resourceList;
@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
@Override
public boolean checkParameters() {
return true;
}
public String getRequirementPath() {
return getPossiblePath(requirements);
}
public String getPythonCommand() {
String command;
if (pythonCommand.isEmpty()) {
command = "${PYTHON_HOME}";
} else {
command = pythonCommand;
}
return command;
}
public String getScriptPath() {
return getPossiblePath(script);
}
private String getPossiblePath(String filePath) {
String possiblePath = filePath;
File sourceFile = new File(possiblePath);
String newPath = Paths.get(pythonPath, possiblePath).toString();
File newFile = new File(newPath);
if (newFile.exists() && !sourceFile.exists()) {
possiblePath = newPath;
} else if (resourceList != null) {
String newPathResource = StringUtils.removeStart(newPath, "./");
for (ResourceInfo resourceInfo : resourceList) {
if (resourceInfo.getResourceName().equals("/" + newPathResource)) {
possiblePath = newPath;
break;
}
}
}
return possiblePath;
}
}

132
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.pytorch;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class PytorchTask extends AbstractTaskExecutor {
private final ShellCommandExecutor shellCommandExecutor;
protected PytorchParameters pytorchParameters;
protected TaskExecutionContext taskExecutionContext;
private PythonEnvManager pythonEnvManager;
public PytorchTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,
logger);
}
@Override
public void init() {
logger.info("python task params {}", taskExecutionContext.getTaskParams());
pytorchParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PytorchParameters.class);
if (!pytorchParameters.checkParameters()) {
throw new TaskException("python task params is not valid");
}
pythonEnvManager = new PythonEnvManager();
pythonEnvManager.setPythonEnvTool(pytorchParameters.getPythonEnvTool());
pythonEnvManager.setCondaPythonVersion(pytorchParameters.getCondaPythonVersion());
}
@Override
public void handle() throws Exception {
try {
String command = buildPythonExecuteCommand();
TaskResponse taskResponse = shellCommandExecutor.run(command);
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw e;
}
}
public String buildPythonExecuteCommand() throws Exception {
List<String> args = new ArrayList<>();
String pythonPath = pytorchParameters.getPythonPath();
if (GitProjectManager.isGitPath(pythonPath)) {
GitProjectManager gpm = new GitProjectManager();
gpm.setPath(pythonPath);
gpm.setBaseDir(taskExecutionContext.getExecutePath());
gpm.prepareProject();
pytorchParameters.setPythonPath(gpm.getGitLocalPath());
}
args.add(String.format("export PYTHONPATH=%s", pytorchParameters.getPythonPath()));
if (pytorchParameters.getIsCreateEnvironment()) {
String buildEnvCommand = pythonEnvManager.getBuildEnvCommand(pytorchParameters.getRequirementPath());
args.add(buildEnvCommand);
}
String scriptParams = pytorchParameters.getScriptParams();
if (scriptParams != null && !scriptParams.isEmpty()) {
args.add(String.format("%s %s %s", getPythonCommand(), pytorchParameters.getScriptPath(), pytorchParameters.getScriptParams()));
} else {
args.add(String.format("%s %s", getPythonCommand(), pytorchParameters.getScriptPath()));
}
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParamUtils.convert(paramsMap));
}
private String getPythonCommand() {
String pythonCommand;
if (pytorchParameters.getIsCreateEnvironment()) {
pythonCommand = pythonEnvManager.getPythonCommand();
} else {
pythonCommand = pytorchParameters.getPythonCommand();
}
return pythonCommand;
}
@Override
public AbstractParameters getParameters() {
return pytorchParameters;
}
}

49
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskChannel.java

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.pytorch;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
public class PytorchTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public PytorchTask createTask(TaskExecutionContext taskRequest) {
return new PytorchTask(taskRequest);
}
@Override
public AbstractParameters parseParameters(ParametersNode parametersNode) {
return JSONUtils.parseObject(parametersNode.getTaskParams(), PytorchParameters.class);
}
@Override
public ResourceParametersHelper getResources(String parameters) {
return null;
}
}

45
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskChannelFactory.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.pytorch;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.Collections;
import java.util.List;
import com.google.auto.service.AutoService;
@AutoService(TaskChannelFactory.class)
public class PytorchTaskChannelFactory implements TaskChannelFactory {
@Override
public TaskChannel create() {
return new PytorchTaskChannel();
}
@Override
public String getName() {
return "PYTORCH";
}
@Override
public List<PluginParams> getParams() {
return Collections.emptyList();
}
}

252
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.pytorch;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Date;
import java.util.Set;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({JSONUtils.class, PropertyUtils.class,})
@PowerMockIgnore({"javax.*"})
@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils")
public class PytorchTaskTest {
private final String pythonPath = ".";
private final String requirementPath = "requirements.txt";
@Before
public void before() {
PowerMockito.mockStatic(PropertyUtils.class);
}
@Test
public void testPythonEnvManager() {
PythonEnvManager envManager = new PythonEnvManager();
envManager.setPythonEnvTool(PythonEnvManager.ENV_TOOL_CONDA);
envManager.setCondaPythonVersion("3.9");
String condaEnvCommand39 = envManager.getBuildEnvCommand(requirementPath);
Assert.assertEquals(condaEnvCommand39, "conda create -y python=3.9 -p ./venv && source activate ./venv && python -m pip install -r " + requirementPath);
envManager.setCondaPythonVersion("3.8");
String condaEnvCommand38 = envManager.getBuildEnvCommand(requirementPath);
Assert.assertEquals(condaEnvCommand38, "conda create -y python=3.8 -p ./venv && source activate ./venv && python -m pip install -r " + requirementPath);
envManager.setPythonEnvTool(PythonEnvManager.ENV_TOOL_VENV);
String venvEnvCommand = envManager.getBuildEnvCommand(requirementPath);
Assert.assertEquals(venvEnvCommand, "virtualenv -p ${PYTHON_HOME} ./venv && source ./venv/bin/activate && python -m pip install -r " + requirementPath);
}
@Test
public void testGitProject() {
assertFalse(GitProjectManager.isGitPath("dolphinscheduler/test"));
assertFalse(GitProjectManager.isGitPath("/dolphinscheduler/test"));
assertTrue(GitProjectManager.isGitPath("https://github.com/apache/dolphinscheduler.git"));
assertTrue(GitProjectManager.isGitPath("git@github.com:apache/dolphinscheduler.git"));
assertTrue(GitProjectManager.isGitPath("git@github.com:apache/dolphinscheduler.git#doc"));
GitProjectManager gpm1 = new GitProjectManager();
gpm1.setPath("git@github.com:apache/dolphinscheduler.git#doc");
Assert.assertEquals("git@github.com:apache/dolphinscheduler.git", gpm1.getGitUrl());
Assert.assertEquals("./GIT_PROJECT/doc", gpm1.getGitLocalPath());
GitProjectManager gpm2 = new GitProjectManager();
gpm2.setPath("git@github.com:apache/dolphinscheduler.git");
Assert.assertEquals("git@github.com:apache/dolphinscheduler.git", gpm2.getGitUrl());
Assert.assertEquals("./GIT_PROJECT", gpm2.getGitLocalPath());
}
@Test
public void testBuildPythonCommandWithoutCreateEnvironment() throws Exception {
PytorchParameters parameters = new PytorchParameters();
parameters.setScript("main.py");
parameters.setScriptParams("--epochs=1 --dry-run");
PytorchTask task1 = initTask(parameters);
Assert.assertEquals(task1.buildPythonExecuteCommand(),
"export PYTHONPATH=.\n" +
"${PYTHON_HOME} main.py --epochs=1 --dry-run");
parameters.setPythonCommand("");
PytorchTask task2 = initTask(parameters);
Assert.assertEquals(task2.buildPythonExecuteCommand(),
"export PYTHONPATH=.\n" +
"${PYTHON_HOME} main.py --epochs=1 --dry-run");
parameters.setPythonCommand("/usr/bin/python");
PytorchTask task3 = initTask(parameters);
Assert.assertEquals(task3.buildPythonExecuteCommand(),
"export PYTHONPATH=.\n" +
"/usr/bin/python main.py --epochs=1 --dry-run");
}
@Test
public void testBuildPythonCommandWithCreateCondeEnv() throws Exception {
PytorchParameters parameters = new PytorchParameters();
parameters.setPythonPath(pythonPath);
parameters.setIsCreateEnvironment(true);
parameters.setCondaPythonVersion("3.6");
parameters.setPythonEnvTool(PythonEnvManager.ENV_TOOL_CONDA);
parameters.setRequirements("requirements.txt");
parameters.setScript("main.py");
parameters.setScriptParams("--epochs=1 --dry-run");
PytorchTask task = initTask(parameters);
Assert.assertEquals(task.buildPythonExecuteCommand(),
"export PYTHONPATH=.\n" +
"conda create -y python=3.6 -p ./venv && source activate ./venv && python -m pip install -r requirements.txt\n" +
"./venv/bin/python main.py --epochs=1 --dry-run");
}
@Test
public void testBuildPythonCommandWithCreateVenvEnv() throws Exception {
PytorchParameters parameters = new PytorchParameters();
parameters.setPythonPath(pythonPath);
parameters.setIsCreateEnvironment(true);
parameters.setPythonEnvTool(PythonEnvManager.ENV_TOOL_VENV);
parameters.setRequirements("requirements.txt");
parameters.setScript("main.py");
parameters.setScriptParams("--epochs=1 --dry-run");
PytorchTask task = initTask(parameters);
Assert.assertEquals(task.buildPythonExecuteCommand(),
"export PYTHONPATH=.\n" +
"virtualenv -p ${PYTHON_HOME} ./venv && source ./venv/bin/activate && python -m pip install -r requirements.txt\n" +
"./venv/bin/python main.py --epochs=1 --dry-run");
}
@Test
public void testGetPossiblePath() throws Exception {
String requirements = "requirements.txt";
String script = "train.py";
String pythonPath = Paths.get("/tmp", UUID.randomUUID().toString()).toString();
PytorchParameters parameters = new PytorchParameters();
parameters.setRequirements(requirements);
parameters.setScript(script);
parameters.setPythonPath(pythonPath);
parameters.setIsCreateEnvironment(true);
parameters.setPythonEnvTool(PythonEnvManager.ENV_TOOL_VENV);
PytorchTask task = initTask(parameters);
String requirementFile = Paths.get(pythonPath, requirements).toString();
String scriptFile = Paths.get(pythonPath, script).toString();
createFile(requirementFile);
createFile(scriptFile);
String expected = "export PYTHONPATH=%s\n" +
"virtualenv -p ${PYTHON_HOME} ./venv && source ./venv/bin/activate && python -m pip install -r %s\n" +
"./venv/bin/python %s";
System.out.println(task.buildPythonExecuteCommand());
Assert.assertEquals(String.format(expected, pythonPath, requirementFile, scriptFile), task.buildPythonExecuteCommand());
}
private PytorchTask initTask(PytorchParameters pytorchParameters) {
TaskExecutionContext taskExecutionContext = createContext(pytorchParameters);
PytorchTask task = new PytorchTask(taskExecutionContext);
task.init();
return task;
}
public TaskExecutionContext createContext(PytorchParameters pytorchParameters) {
String parameters = JSONUtils.toJsonString(pytorchParameters);
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getTaskLogName()).thenReturn("PytorchTest");
String APP_ID = UUID.randomUUID().toString();
String folder = String.format("/tmp/dolphinscheduler_PytorchTest_%s", APP_ID);
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn(folder);
Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn(APP_ID);
Mockito.when(taskExecutionContext.getTenantCode()).thenReturn("root");
Mockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
Mockito.when(taskExecutionContext.getLogPath()).thenReturn(folder + "/log");
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
String envirementConfig = "export PATH=$HOME/anaconda3/bin:$PATH\n" + "export PYTHON_HOME=/bin/python";
Mockito.when(taskExecutionContext.getEnvironmentConfig()).thenReturn(envirementConfig);
String userName = System.getenv().get("USER");
Mockito.when(taskExecutionContext.getTenantCode()).thenReturn(userName);
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}
private void createFile(String fileName) throws Exception {
File file = new File(fileName);
Path path = file.toPath();
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (SystemUtils.IS_OS_WINDOWS) {
Files.createFile(path);
} else {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
try {
Files.createFile(path, attr);
} catch (FileAlreadyExistsException ex) {
// this is expected
}
}
}
}

1
dolphinscheduler-task-plugin/pom.xml

@ -60,6 +60,7 @@
<module>dolphinscheduler-task-sagemaker</module>
<module>dolphinscheduler-task-chunjun</module>
<module>dolphinscheduler-task-flink-stream</module>
<module>dolphinscheduler-task-pytorch</module>
</modules>
<dependencyManagement>

BIN
dolphinscheduler-ui/public/images/task-icons/pytorch.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.6 KiB

BIN
dolphinscheduler-ui/public/images/task-icons/pytorch_hover.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

13
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -753,6 +753,17 @@ export default {
dinky_address_tips: 'Please enter the url of your dinky',
dinky_task_id: 'Dinky task id',
dinky_task_id_tips: 'Please enter the task id of your dinky',
dinky_online: 'Online task'
dinky_online: 'Online task',
pytorch_script: 'Python Script',
pytorch_script_params: 'Script Input Parameters',
pytorch_other_params: 'Show More Configurations',
pytorch_python_path: 'Project Path',
pytorch_is_create_environment: 'Create An Environment Or Not',
pytorch_python_command: 'Python Command Path',
pytorch_python_command_tips: 'If empty,will be set $PYTHON_HOME',
pytorch_python_env_tool: 'Python Environment Manager Tool',
pytorch_requirements: 'Requirement File',
pytorch_conda_python_version: 'Python Version',
pytorch_conda_python_version_tips: 'Please enter the version number, such as 3.6, 3.7, 3.x'
}
}

13
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -738,6 +738,17 @@ export default {
dinky_address_tips: '请输入 Dinky 地址',
dinky_task_id: 'dinky 作业ID',
dinky_task_id_tips: '请输入作业 ID',
dinky_online: '是否上线作业'
dinky_online: '是否上线作业',
pytorch_script: 'python脚本',
pytorch_script_params: '脚本启动参数',
pytorch_other_params: '展开更多配置',
pytorch_python_path: 'python项目地址',
pytorch_is_create_environment: '是否创建新环境',
pytorch_python_command: 'python命令路径',
pytorch_python_command_tips: '若为空,则使用$PYTHON_HOME',
pytorch_python_env_tool: 'python环境管理工具',
pytorch_requirements: '依赖文件',
pytorch_conda_python_version: 'python版本',
pytorch_conda_python_version_tips: '请输入版本号,如 3.6, 3.7, 3.x等'
}
}

1
dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts

@ -76,3 +76,4 @@ export { useDinky } from './use-dinky'
export { useSagemaker } from './use-sagemaker'
export { useChunjun } from './use-chunjun'
export { useChunjunDeployMode } from './use-chunjun-deploy-mode'
export { usePytorch } from './use-pytorch'

138
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-pytorch.ts

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useI18n } from 'vue-i18n'
import type { IJsonItem } from '../types'
import { watch, ref } from 'vue'
import { useCustomParams, useResources } from '.'
export function usePytorch(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const isCreateEnvironmentSpan = ref(0)
const pythonPathSpan = ref(0)
const pythonEnvToolSpan = ref(0)
const pythonCommandSpan = ref(0)
const requirementsSpan = ref(0)
const condaPythonVersionSpan = ref(0)
const setFlag = () => {
model.showCreateEnvironment = model.isCreateEnvironment && model.otherParams
model.showCreateConda =
model.showCreateEnvironment && model.pythonEnvTool === 'conda'
? true
: false
model.showCreateEnv =
model.showCreateEnvironment && model.pythonEnvTool === 'virtualenv'
? true
: false
}
const resetSpan = () => {
isCreateEnvironmentSpan.value = model.otherParams ? 12 : 0
pythonPathSpan.value = model.otherParams ? 24 : 0
pythonEnvToolSpan.value = model.showCreateEnvironment ? 12 : 0
pythonCommandSpan.value = ~model.showCreateEnvironment & model.otherParams ? 12 : 0
requirementsSpan.value = model.showCreateEnvironment ? 24 : 0
condaPythonVersionSpan.value = model.showCreateConda ? 24 : 0
}
watch(
() => [model.isCreateEnvironment, model.pythonEnvTool, model.otherParams],
() => {
setFlag()
resetSpan()
}
)
return [
{
type: 'input',
field: 'script',
name: t('project.node.pytorch_script'),
span: 24
},
{
type: 'input',
field: 'scriptParams',
name: t('project.node.pytorch_script_params'),
span: 24
},
{
type: 'switch',
field: 'otherParams',
name: t('project.node.pytorch_other_params'),
span: 24
},
{
type: 'input',
field: 'pythonPath',
name: t('project.node.pytorch_python_path'),
span: pythonPathSpan
},
{
type: 'switch',
field: 'isCreateEnvironment',
name: t('project.node.pytorch_is_create_environment'),
span: isCreateEnvironmentSpan
},
{
type: 'input',
field: 'pythonCommand',
name: t('project.node.pytorch_python_command'),
span: pythonCommandSpan,
props: {
placeholder: t('project.node.pytorch_python_command_tips')
}
},
{
type: 'select',
field: 'pythonEnvTool',
name: t('project.node.pytorch_python_env_tool'),
span: pythonEnvToolSpan,
options: PYTHON_ENV_TOOL
},
{
type: 'input',
field: 'requirements',
name: t('project.node.pytorch_requirements'),
span: requirementsSpan
},
{
type: 'input',
field: 'condaPythonVersion',
name: t('project.node.pytorch_conda_python_version'),
span: condaPythonVersionSpan,
props: {
placeholder: t('project.node.pytorch_conda_python_version_tips')
}
},
useResources(),
...useCustomParams({ model, field: 'localParams', isSimple: false })
]
}
export const PYTHON_ENV_TOOL = [
{
label: 'conda',
value: 'conda'
},
{
label: 'virtualenv',
value: 'virtualenv'
}
]

10
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -386,6 +386,16 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'SAGEMAKER') {
taskParams.sagemakerRequestJson = data.sagemakerRequestJson
}
if (data.taskType === 'PYTORCH') {
taskParams.script = data.script
taskParams.scriptParams = data.scriptParams
taskParams.pythonPath = data.pythonPath
taskParams.isCreateEnvironment = data.isCreateEnvironment
taskParams.pythonCommand = data.pythonCommand
taskParams.pythonEnvTool = data.pythonEnvTool
taskParams.requirements = data.requirements
taskParams.condaPythonVersion = data.condaPythonVersion
}
if (data.taskType === 'DINKY') {
taskParams.address = data.address

4
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts

@ -43,6 +43,7 @@ import { useDvc } from './use-dvc'
import { useDinky } from './use-dinky'
import { userSagemaker } from './use-sagemaker'
import { useChunjun } from './use-chunjun'
import { usePytorch } from './use-pytorch'
export default {
SHELL: useShell,
@ -72,5 +73,6 @@ export default {
DINKY: useDinky,
SAGEMAKER: userSagemaker,
CHUNJUN: useChunjun,
FLINK_STREAM: useFlinkStream
FLINK_STREAM: useFlinkStream,
PYTORCH: usePytorch
}

88
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData, ITaskData } from '../types'
export function usePytorch({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const model = reactive({
name: '',
taskType: 'PYTORCH',
flag: 'YES',
description: '',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
delayTime: 0,
timeout: 30,
timeoutNotifyStrategy: ['WARN'],
pythonEnvTool: 'conda',
pythonCommand: '${PYTHON_HOME}',
condaPythonVersion: '3.7',
requirements: 'requirements.txt',
pythonPath: '.'
} as INodeData)
let extra: IJsonItem[] = []
if (from === 1) {
extra = [
Fields.useTaskType(model, readonly),
Fields.useProcessName({
model,
projectCode,
isCreate: !data?.id,
from,
processName: data?.processName
})
]
}
return {
json: [
Fields.useName(from),
...extra,
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.usePytorch(model),
Fields.usePreTasks()
] as IJsonItem[],
model
}
}

8
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -364,6 +364,14 @@ interface ITaskParams {
taskId?: string
online?: boolean
sagemakerRequestJson?: string
script?: string
scriptParams?: string
pythonPath?: string
isCreateEnvironment?: string
pythonCommand?: string
pythonEnvTool?: string
requirements?: string
condaPythonVersion?: string
}
interface INodeData

5
dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts

@ -43,6 +43,7 @@ export type TaskType =
| 'SAGEMAKER'
| 'CHUNJUN'
| 'FLINK_STREAM'
| 'PYTORCH'
export type TaskExecuteType = 'STREAM' | 'BATCH'
@ -146,6 +147,10 @@ export const TASK_TYPES_MAP = {
alias: 'FLINK_STREAM',
helperLinkDisable: true,
taskExecuteType: 'STREAM'
},
PYTORCH: {
alias: 'Pytorch',
helperLinkDisable: true
}
} as {
[key in TaskType]: {

6
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

@ -183,6 +183,9 @@ $bgLight: #ffffff;
&.icon-chunjun {
background-image: url('/images/task-icons/chunjun.png');
}
&.icon-pytorch {
background-image: url('/images/task-icons/pytorch.png');
}
}
&:hover {
@ -269,6 +272,9 @@ $bgLight: #ffffff;
&.icon-chunjun {
background-image: url('/images/task-icons/chunjun_hover.png');
}
&.icon-pytorch {
background-image: url('/images/task-icons/pytorch_hover.png');
}
}
}
}

Loading…
Cancel
Save