Browse Source

[Feature][Task Plugin] Add DVC task plugin for MLops scenario (#10372) (#10407)

3.1.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
36e20cdfc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 125
      docs/docs/en/guide/task/dvc.md
  2. 110
      docs/docs/zh/guide/task/dvc.md
  3. BIN
      docs/img/tasks/demo/dvc_download.png
  4. BIN
      docs/img/tasks/demo/dvc_env_config.png
  5. BIN
      docs/img/tasks/demo/dvc_env_name.png
  6. BIN
      docs/img/tasks/demo/dvc_init.png
  7. BIN
      docs/img/tasks/demo/dvc_upload.png
  8. BIN
      docs/img/tasks/icons/dvc.png
  9. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
  10. 46
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/pom.xml
  11. 56
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java
  12. 105
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java
  13. 163
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
  14. 51
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannel.java
  15. 58
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannelFactory.java
  16. 30
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java
  17. 153
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
  18. 1
      dolphinscheduler-task-plugin/pom.xml
  19. BIN
      dolphinscheduler-ui/public/images/task-icons/dvc.png
  20. BIN
      dolphinscheduler-ui/public/images/task-icons/dvc_hover.png
  21. 10
      dolphinscheduler-ui/src/locales/en_US/project.ts
  22. 10
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  23. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
  24. 171
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dvc.ts
  25. 11
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  26. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
  27. 82
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts
  28. 7
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
  29. 5
      dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
  30. 6
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

125
docs/docs/en/guide/task/dvc.md

@ -0,0 +1,125 @@
# DVC Node
## Overview
[DVC (Data Version Control)](https://dvc.org) is an excellent open-source version control system for machine learning projects.
The DVC plugin is used to use the data version management function of DVC on DolphinScheduler, helping users to carry out data version management easily.
The plugin provides the following three functions:
- Init DVC: Initialize the Git repository as a DVC repository and bind the address where the data is stored to store the actual data.
- Upload: Add or update specific data to the repository and record the version tag.
- Download: Download a specific version of data from the repository.
## Create Task
- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the "Create Workflow" button to enter the
DAG editing page.
- Drag from the toolbar <img src="../../../../img/tasks/icons/dvc.png" width="15"/> task node to canvas.
## Task Example
First, introduce some general parameters of DolphinScheduler:
- **Node name**: The node name in a workflow definition is unique.
- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select
the `prohibition execution`.
- **Descriptive information**: Describe the function of the node.
- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high
to low, and tasks with the same priority will execute in a first-in first-out order.
- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected,
randomly select a worker machine for execution.
- **Environment Name**: Configure the environment name in which run the script.
- **Times of failed retry attempts**: The number of times the task failed to resubmit.
- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
- **Delayed execution time**: The time (unit minute) that a task delays in execution.
- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm
email will send and the task execution will fail.
- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as
upstream of the current task.
Here are some specific parameters for the DVC plugin:
- **DVC Task Type** :Upload, Download or Init DVC。
- **DVC Repository** :The DVC repository address associated with the task execution.
### Init DVC
Initialize the Git repository as a DVC repository and add a new data remote to save data.
After the project is initialized, it is still a Git repository, but with DVC features added.
The data is not actually stored in a Git repository, but somewhere else, and DVC keeps track of the version and address of the data and handles this relationship.
![dvc_init](../../../../img/tasks/demo/dvc_init.png)
**Task Parameter**
- **Remote Store Url** :The actual data is stored at the address. You can learn about the supported storage types from the [DVC supported storage types](https://dvc.org/doc/command-reference/remote/add#supported-storage-types) .
The example above shows that:
Initialize repository `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git` as a DVC project and bind the remote storage address to `~/dvc`
### Upload
Used to upload and update data and record version numbers.
![dvc_upload](../../../../img/tasks/demo/dvc_upload.png)
**Task Parameter**
- **Data Path in DVC Repository** :The data will be uploaded to this path in the repository.
- **Data Path In Worker** :Data path to be uploaded.
- **Version** :After the data is uploaded, the version tag for the data will be saved in `git tag`.
- **Version Message** :Version Message.
The example above shows that:
Upload data `/home/data/iris` to the root directory of repository `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git`. The file or folder of data is named `iris`.
Then run `git tag "iris_1.0" -m "init iris data"`. Record the version tag `iris_1.0` and the version message `inir iris data`.
### Download
Used to download data for a specific version.
![dvc_download](../../../../img/tasks/demo/dvc_download.png)
**Task Parameter**
- **Data Path in DVC Repository** :The path to the data to download in the DVC repository.
- **Data Path In Worker** :Path for saving data after the file is downloaded to the local.
- **Version** :The version of the data to download.
The example above shows that:
Download the data for iris data at version `iris_1.0` in repository `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git` to the `~/dvc_test/iris`
## Environment to prepare
### Install DVC
Make sure you have installed DVC, if not, you can run `pip install dvc` command to install it.
Get the 'dvc' path and configure the environment variables.
The conda environment is used as an example:
Install python PIP on Conda and configure conda's environment variables so that the component can correctly find the 'DVC' command
```shell
which dvc
# >> ~/anaconda3/bin/dvc
```
You need to enter the admin account to configure a conda environment variable(Please
install [anaconda](https://docs.continuum.io/anaconda/install/)
or [miniconda](https://docs.conda.io/en/latest/miniconda.html#installing ) in advance).
![dvc_env_config](../../../../img/tasks/demo/dvc_env_config.png)
Note During the configuration task, select the conda environment created above. Otherwise, the program cannot find the
Conda environment.
![dvc_env_name](../../../../img/tasks/demo/dvc_env_name.png)

110
docs/docs/zh/guide/task/dvc.md

@ -0,0 +1,110 @@
# DVC节点
## 综述
[DVC(Data Version Control)](https://dvc.org) 是一个MLops领域一个优秀的开机器学习版本管理系统。
DVC 组件用于在DS上使用DVC的数据版本管理功能,帮助用户简易地进行数据的版本管理。组件提供如下三个功能:
- Init DVC: 将git仓库初始化为DVC仓库,并绑定存储数据的地址用于存储实际的数据。
- Upload: 将特定数据添加或者更新到仓库中,并记录版本号。
- Download: 从仓库中下载特定版本的数据。
## 创建任务
- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
- 拖动工具栏的 <img src="../../../../img/tasks/icons/dvc.png" width="15"/> 任务节点到画板中。
## 任务样例
首先介绍一些DS通用参数
- **节点名称** :设置任务的名称。一个工作流定义中的节点名称是唯一的。
- **运行标志** :标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
- **描述** :描述该节点的功能。
- **任务优先级** :worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
- **Worker 分组** :任务分配给 worker 组的机器执行,选择 Default,会随机选择一台 worker 机执行。
- **环境名称** :配置运行脚本的环境。
- **失败重试次数** :任务失败重新提交的次数。
- **失败重试间隔** :任务失败重新提交任务的时间间隔,以分钟为单位。
- **延迟执行时间** :任务延迟执行的时间,以分钟为单位。
- **超时告警** :勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。
- **前置任务** :选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
以下是一些DVC 组件的常用参数
- **DVC任务类型** :可以选择 Upload、Download、Init DVC。
- **DVC仓库** :任务执行时关联的仓库地址。
### Init DVC
将git仓库初始化为DVC仓库, 并绑定数据储存的地方。
项目初始化后,仍然为git仓库,不过添加了DVC的特性。
实际上数据并不保存在git仓库,而是存储在另外的地方,DVC会跟踪数据的版本和地址,并处理好这个关系。
![dvc_init](../../../../img/tasks/demo/dvc_init.png)
**任务参数**
- **数据存储地址**
:实际的数据保存的地址,支持的类型可见 [DVC supported storage types](https://dvc.org/doc/command-reference/remote/add#supported-storage-types)
如上述例子表示: 将仓库 `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git` 初始化为DVC项目,并绑定远程储存地址为 `~/dvc`
### Upload
用于上传和更新数据,并记录版本号。
![dvc_upload](../../../../img/tasks/demo/dvc_upload.png)
**任务参数**
- **DVC仓库中的数据路径** :上传的数据保存到仓库的地址。
- **Worker中数据路径** :需要上传的数据的地址。
- **数据版本** :上传数据后,为该版本数据打上的版本号,会保存到 git tag 里面。
- **数据版本信息** :本次上传需要备注的信息。
如上述例子表示: 将数据 `/home/data/iris` 上传到仓库 `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git`
的根目录下,数据的文件/文件夹名字为`iris`。 然后执行 `git tag "iris_1.0" -m "init iris data"`。 记录版本号 `iris_1.0`和 版本信息 'inir iris data'
### Download
用于下载特定版本的数据。
![dvc_download](../../../../img/tasks/demo/dvc_download.png)
**任务参数**
- **DVC仓库中的数据路径** :需要下载数据在仓库中的路径。
- **Worker中数据路径** :数据下载到本地后的保存地址。
- **数据版本** :需要下载的数据的版本。
如上述例子表示: 将仓库 `git@github.com:xxxx/dvc-data-repository-example.git` 版本为 `iris_1.0` 的 iris 的数据下载到 `~/dvc_test/iris`
## 环境准备
### dvc 安装
确保你已经安装DVC可以使用`pip install dvc`进行安装。
获取dvc地址, 并配置环境变量
下面以 conda 上的 python pip 安装为例子,配置 conda 的环境变量,使得组件能正确找到`dvc`命令
```shell
which dvc
# >> ~/anaconda3/bin/dvc
```
你需要进入admin账户配置一个conda环境变量(请提前[安装anaconda](https://docs.continuum.io/anaconda/install/)
或者[安装miniconda](https://docs.conda.io/en/latest/miniconda.html#installing) )。
![dvc_env_config](../../../../img/tasks/demo/dvc_env_config.png)
后续注意配置任务时,环境选择上面创建的conda环境,否则程序会找不到conda环境。
![dvc_env_name](../../../../img/tasks/demo/dvc_env_name.png)

BIN
docs/img/tasks/demo/dvc_download.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

BIN
docs/img/tasks/demo/dvc_env_config.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 119 KiB

BIN
docs/img/tasks/demo/dvc_env_name.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 49 KiB

BIN
docs/img/tasks/demo/dvc_init.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

BIN
docs/img/tasks/demo/dvc_upload.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

BIN
docs/img/tasks/icons/dvc.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.5 KiB

6
dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml

@ -171,6 +171,12 @@
<artifactId>dolphinscheduler-task-openmldb</artifactId> <artifactId>dolphinscheduler-task-openmldb</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-dvc</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

46
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/pom.xml

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-task-dvc</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
</dependencies>
</project>

56
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dvc;
public class DvcConstants {
private DvcConstants() {
throw new IllegalStateException("Utility class");
}
public static final String CHECK_AND_SET_DVC_REPO = "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=%s";
public static final String SET_DATA_PATH = "DVC_DATA_PATH=%s";
public static final String SET_DATA_LOCATION = "DVC_DATA_LOCATION=%s";
public static final String SET_VERSION = "DVC_VERSION=%s";
public static final String SET_MESSAGE = "DVC_MESSAGE=\"%s\"";
public static final String GIT_CLONE_DVC_REPO = "git clone $DVC_REPO dvc-repository; cd dvc-repository; pwd";
public static final String DVC_AUTOSTAGE = "dvc config core.autostage true --local || exit 1";
public static final String DVC_ADD_DATA = "dvc add $DVC_DATA_PATH -v -o $DVC_DATA_LOCATION --to-remote || exit 1";
public static final String GIT_UPDATE_FOR_UPDATE_DATA = "git commit -am \"$DVC_MESSAGE\"\n" +
"git tag \"$DVC_VERSION\" -m \"$DVC_MESSAGE\"\n" +
"git push --all\n" +
"git push --tags";
public static final String DVC_DOWNLOAD = "dvc get $DVC_REPO $DVC_DATA_LOCATION -o $DVC_DATA_PATH -v --rev $DVC_VERSION";
public static final String DVC_INIT = "dvc init || exit 1";
public static final String DVC_ADD_REMOTE = "dvc remote add origin %s -d";
public static final String GIT_UPDATE_FOR_INIT_DVC = "git commit -am \"init dvc project and add remote\"; git push";
}

105
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dvc;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
public class DvcParameters extends AbstractParameters {
/**
* common parameters
*/
private TaskTypeEnum dvcTaskType;
private String dvcRepository;
private String dvcVersion;
private String dvcDataLocation;
private String dvcMessage;
private String dvcLoadSaveDataPath;
private String dvcStoreUrl;
public void setDvcTaskType(TaskTypeEnum dvcTaskType) {
this.dvcTaskType = dvcTaskType;
}
public TaskTypeEnum getDvcTaskType() {
return dvcTaskType;
}
public void setDvcRepository(String dvcRepository) {
this.dvcRepository = dvcRepository;
}
public String getDvcRepository() {
return dvcRepository;
}
public void setDvcVersion(String dvcVersion) {
this.dvcVersion = dvcVersion;
}
public String getDvcVersion() {
return dvcVersion;
}
public void setDvcDataLocation(String dvcDataLocation) {
this.dvcDataLocation = dvcDataLocation;
}
public String getDvcDataLocation() {
return dvcDataLocation;
}
public void setDvcMessage(String dvcMessage) {
this.dvcMessage = dvcMessage;
}
public String getDvcMessage() {
return dvcMessage;
}
public void setDvcLoadSaveDataPath(String dvcLoadSaveDataPath) {
this.dvcLoadSaveDataPath = dvcLoadSaveDataPath;
}
public String getDvcLoadSaveDataPath() {
return dvcLoadSaveDataPath;
}
public void setDvcStoreUrl(String dvcStoreUrl) {
this.dvcStoreUrl = dvcStoreUrl;
}
public String getDvcStoreUrl() {
return dvcStoreUrl;
}
@Override
public boolean checkParameters() {
Boolean checkResult = true;
return checkResult;
}
}

163
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dvc;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.ArrayList;
import java.util.List;
/**
* shell task
*/
public class DvcTask extends AbstractTaskExecutor {
/**
* dvc parameters
*/
private DvcParameters parameters;
/**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
public DvcTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
public void init() {
logger.info("dvc task params {}", taskExecutionContext.getTaskParams());
parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DvcParameters.class);
if (!parameters.checkParameters()) {
throw new RuntimeException("dvc task params is not valid");
}
}
@Override
public void handle() throws Exception {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
parameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (Exception e) {
logger.error("dvc task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
shellCommandExecutor.cancelApplication();
}
public String buildCommand() {
String command = "";
TaskTypeEnum taskType = parameters.getDvcTaskType();
if (taskType == TaskTypeEnum.UPLOAD) {
command = buildUploadCommond();
}else if (taskType == TaskTypeEnum.DOWNLOAD){
command = buildDownCommond();
}else if (taskType == TaskTypeEnum.INIT){
command = buildInitDvcCommond();
}
logger.info("Run DVC task with command: \n{}", command);
return command;
}
private String buildUploadCommond() {
List<String> args = new ArrayList<>();
args.add(String.format(DvcConstants.CHECK_AND_SET_DVC_REPO, parameters.getDvcRepository()));
args.add(String.format(DvcConstants.SET_DATA_PATH, parameters.getDvcLoadSaveDataPath()));
args.add(String.format(DvcConstants.SET_DATA_LOCATION, parameters.getDvcDataLocation()));
args.add(String.format(DvcConstants.SET_VERSION, parameters.getDvcVersion()));
args.add(String.format(DvcConstants.SET_MESSAGE, parameters.getDvcMessage()));
args.add(DvcConstants.GIT_CLONE_DVC_REPO);
args.add(DvcConstants.DVC_AUTOSTAGE);
args.add(DvcConstants.DVC_ADD_DATA);
args.add(DvcConstants.GIT_UPDATE_FOR_UPDATE_DATA);
String command = String.join("\n", args);
return command;
}
private String buildDownCommond() {
List<String> args = new ArrayList<>();
args.add(String.format(DvcConstants.CHECK_AND_SET_DVC_REPO, parameters.getDvcRepository()));
args.add(String.format(DvcConstants.SET_DATA_PATH, parameters.getDvcLoadSaveDataPath()));
args.add(String.format(DvcConstants.SET_DATA_LOCATION, parameters.getDvcDataLocation()));
args.add(String.format(DvcConstants.SET_VERSION, parameters.getDvcVersion()));
args.add(DvcConstants.DVC_DOWNLOAD);
String command = String.join("\n", args);
return command;
}
private String buildInitDvcCommond() {
List<String> args = new ArrayList<>();
args.add(String.format(DvcConstants.CHECK_AND_SET_DVC_REPO, parameters.getDvcRepository()));
args.add(DvcConstants.GIT_CLONE_DVC_REPO);
args.add(DvcConstants.DVC_INIT);
args.add(String.format(DvcConstants.DVC_ADD_REMOTE, parameters.getDvcStoreUrl()));
args.add(DvcConstants.GIT_UPDATE_FOR_INIT_DVC);
String command = String.join("\n", args);
return command;
}
@Override
public AbstractParameters getParameters() {
return parameters;
}
}

51
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannel.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dvc;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
public class DvcTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public DvcTask createTask(TaskExecutionContext taskRequest) {
return new DvcTask(taskRequest);
}
@Override
public AbstractParameters parseParameters(ParametersNode parametersNode) {
return JSONUtils.parseObject(parametersNode.getTaskParams(), DvcParameters.class);
}
@Override
public ResourceParametersHelper getResources(String parameters) {
return null;
}
}

58
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannelFactory.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dvc;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
import org.apache.dolphinscheduler.spi.params.input.InputParam;
import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import java.util.ArrayList;
import java.util.List;
import com.google.auto.service.AutoService;
@AutoService(TaskChannelFactory.class)
public class DvcTaskChannelFactory implements TaskChannelFactory {
@Override
public TaskChannel create() {
return new DvcTaskChannel();
}
@Override
public String getName() {
return "DVC";
}
@Override
public List<PluginParams> getParams() {
List<PluginParams> paramsList = new ArrayList<>();
InputParam nodeName = InputParam.newBuilder("name", "$t('Node name')").addValidate(Validate.newBuilder().setRequired(true).build()).build();
RadioParam runFlag = RadioParam.newBuilder("runFlag", "RUN_FLAG").addParamsOptions(new ParamsOptions("NORMAL", "NORMAL", false)).addParamsOptions(new ParamsOptions("FORBIDDEN", "FORBIDDEN", false)).build();
paramsList.add(nodeName);
paramsList.add(runFlag);
return paramsList;
}
}

30
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dvc;
import com.fasterxml.jackson.annotation.JsonProperty;
public enum TaskTypeEnum {
@JsonProperty("Upload")
UPLOAD,
@JsonProperty("Download")
DOWNLOAD,
@JsonProperty("Init DVC")
INIT
}

153
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dvc;
import java.util.Date;
import java.util.UUID;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
JSONUtils.class,
PropertyUtils.class,
})
@PowerMockIgnore({"javax.*"})
@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils")
public class DvcTaskTest {
@Before
public void before() throws Exception {
PowerMockito.mockStatic(PropertyUtils.class);
}
public TaskExecutionContext createContext(DvcParameters dvcParameters) {
String parameters = JSONUtils.toJsonString(dvcParameters);
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
Mockito.when(taskExecutionContext.getTaskLogName()).thenReturn("DvcTest");
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp/dolphinscheduler_dvc_test");
Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn(UUID.randomUUID().toString());
Mockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
Mockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dolphinscheduler_dvc_test/log");
Mockito.when(taskExecutionContext.getEnvironmentConfig()).thenReturn("export PATH=$HOME/anaconda3/bin:$PATH");
String userName = System.getenv().get("USER");
Mockito.when(taskExecutionContext.getTenantCode()).thenReturn(userName);
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}
private DvcTask initTask(DvcParameters parameters) {
TaskExecutionContext taskExecutionContext = createContext(parameters);
DvcTask dvcTask = new DvcTask(taskExecutionContext);
dvcTask.init();
dvcTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
return dvcTask;
}
@Test
public void testDvcUpload() throws Exception{
DvcTask dvcTask = initTask(createUploadParameters());
Assert.assertEquals(dvcTask.buildCommand(),
"which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example\n" +
"DVC_DATA_PATH=/home/<YOUR-NAME-OR-ORG>/test\n" +
"DVC_DATA_LOCATION=test\n" +
"DVC_VERSION=iris_v2.3.1\n" +
"DVC_MESSAGE=\"add test iris data\"\n" +
"git clone $DVC_REPO dvc-repository; cd dvc-repository; pwd\n" +
"dvc config core.autostage true --local || exit 1\n" +
"dvc add $DVC_DATA_PATH -v -o $DVC_DATA_LOCATION --to-remote || exit 1\n" +
"git commit -am \"$DVC_MESSAGE\"\n" +
"git tag \"$DVC_VERSION\" -m \"$DVC_MESSAGE\"\n" +
"git push --all\n" +
"git push --tags");
}
@Test
public void testDvcDownload() throws Exception{
DvcTask dvcTask = initTask(createDownloadParameters());
Assert.assertEquals(dvcTask.buildCommand(),
"which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example\n" +
"DVC_DATA_PATH=data\n" +
"DVC_DATA_LOCATION=iris\n" +
"DVC_VERSION=iris_v2.3.1\n" +
"dvc get $DVC_REPO $DVC_DATA_LOCATION -o $DVC_DATA_PATH -v --rev $DVC_VERSION");
}
@Test
public void testInitDvc() throws Exception{
DvcTask dvcTask = initTask(createInitDvcParameters());
Assert.assertEquals(dvcTask.buildCommand(),
"which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example\n" +
"git clone $DVC_REPO dvc-repository; cd dvc-repository; pwd\n" +
"dvc init || exit 1\n" +
"dvc remote add origin ~/.dvc_test -d\n" +
"git commit -am \"init dvc project and add remote\"; git push");
}
private DvcParameters createUploadParameters() {
DvcParameters parameters = new DvcParameters();
parameters.setDvcTaskType(TaskTypeEnum.UPLOAD);
parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
parameters.setDvcLoadSaveDataPath("/home/<YOUR-NAME-OR-ORG>/test");
parameters.setDvcDataLocation("test");
parameters.setDvcVersion("iris_v2.3.1");
parameters.setDvcMessage("add test iris data");
return parameters;
}
private DvcParameters createDownloadParameters() {
DvcParameters parameters = new DvcParameters();
parameters.setDvcTaskType(TaskTypeEnum.DOWNLOAD);
parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
parameters.setDvcLoadSaveDataPath("data");
parameters.setDvcDataLocation("iris");
parameters.setDvcVersion("iris_v2.3.1");
return parameters;
}
private DvcParameters createInitDvcParameters() {
DvcParameters parameters = new DvcParameters();
parameters.setDvcTaskType(TaskTypeEnum.INIT);
parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
parameters.setDvcStoreUrl("~/.dvc_test");
return parameters;
}
}

1
dolphinscheduler-task-plugin/pom.xml

@ -55,5 +55,6 @@
<module>dolphinscheduler-task-jupyter</module> <module>dolphinscheduler-task-jupyter</module>
<module>dolphinscheduler-task-mlflow</module> <module>dolphinscheduler-task-mlflow</module>
<module>dolphinscheduler-task-openmldb</module> <module>dolphinscheduler-task-openmldb</module>
<module>dolphinscheduler-task-dvc</module>
</modules> </modules>
</project> </project>

BIN
dolphinscheduler-ui/public/images/task-icons/dvc.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.5 KiB

BIN
dolphinscheduler-ui/public/images/task-icons/dvc_hover.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

10
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -676,6 +676,16 @@ export default {
openmldb_execute_mode_tips: 'Please select the execute mode', openmldb_execute_mode_tips: 'Please select the execute mode',
openmldb_execute_mode_offline: 'offline', openmldb_execute_mode_offline: 'offline',
openmldb_execute_mode_online: 'online', openmldb_execute_mode_online: 'online',
dvc_task_type: 'DVC Task Type',
dvc_repository: 'DVC Repository',
dvc_repository_tips: 'please input the url of dvc repository',
dvc_version: 'Version',
dvc_version_tips: 'data version, will be mark as git tag',
dvc_data_location: 'Data Path in DVC Repository',
dvc_message: 'Version Message',
dvc_load_save_data_path: 'Data Path In Worker',
dvc_store_url: 'Store Url',
dvc_empty_tips: 'This parameter cannot be empty',
send_email: 'Send Email', send_email: 'Send Email',
log_display: 'Log display', log_display: 'Log display',
rows_of_result: 'rows of result', rows_of_result: 'rows of result',

10
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -664,6 +664,16 @@ export default {
openmldb_execute_mode_tips: '请选择执行模式', openmldb_execute_mode_tips: '请选择执行模式',
openmldb_execute_mode_offline: '离线', openmldb_execute_mode_offline: '离线',
openmldb_execute_mode_online: '在线', openmldb_execute_mode_online: '在线',
dvc_task_type: 'DVC任务类型',
dvc_repository: 'DVC仓库',
dvc_repository_tips: '请输入DVC仓库地址',
dvc_version: '数据版本',
dvc_version_tips: '数据版本标识,会以git tag的形式标记',
dvc_data_location: 'DVC仓库中的数据路径',
dvc_message: '提交信息',
dvc_load_save_data_path: 'Worker中数据路径',
dvc_store_url: '数据存储地址',
dvc_empty_tips: '该参数不能为空',
send_email: '发送邮件', send_email: '发送邮件',
log_display: '日志显示', log_display: '日志显示',
rows_of_result: '行查询结果', rows_of_result: '行查询结果',

1
dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts

@ -70,3 +70,4 @@ export { useMlflow } from './use-mlflow'
export { useMlflowProjects } from './use-mlflow-projects' export { useMlflowProjects } from './use-mlflow-projects'
export { useMlflowModels } from './use-mlflow-models' export { useMlflowModels } from './use-mlflow-models'
export { useOpenmldb } from './use-openmldb' export { useOpenmldb } from './use-openmldb'
export { useDvc } from './use-dvc'

171
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dvc.ts

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useI18n } from 'vue-i18n'
import type { IJsonItem } from '../types'
import { watch, ref } from 'vue'
export const DVC_TASK_TYPE = [
{
label: 'Upload',
value: 'Upload'
},
{
label: 'Download',
value: 'Download'
},
{
label: 'Init DVC',
value: 'Init DVC'
}
]
export function useDvc(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const dvcLoadSaveDataPathSpan = ref(0)
const dvcDataLocationSpan = ref(0)
const dvcVersionSpan = ref(0)
const dvcMessageSpan = ref(0)
const dvcStoreUrlSpan = ref(0)
const setFlag = () => {
model.isUpload = model.dvcTaskType === 'Upload'
model.isDownload = model.dvcTaskType === 'Download'
model.isInit = model.dvcTaskType === 'Init DVC'
}
const resetData = () => {
dvcLoadSaveDataPathSpan.value = model.isUpload || model.isDownload ? 24 : 0
dvcDataLocationSpan.value = model.isUpload || model.isDownload ? 24 : 0
dvcVersionSpan.value = model.isUpload || model.isDownload ? 24 : 0
dvcMessageSpan.value = model.isUpload ? 24 : 0
dvcStoreUrlSpan.value = model.isInit ? 24 : 0
}
watch(
() => [model.dvcTaskType],
() => {
setFlag()
resetData()
}
)
setFlag()
resetData()
return [
{
type: 'select',
field: 'dvcTaskType',
name: t('project.node.dvc_task_type'),
span: 12,
options: DVC_TASK_TYPE
},
{
type: 'input',
field: 'dvcRepository',
name: t('project.node.dvc_repository'),
span: 24,
props: {
placeholder: t('project.node.dvc_repository_tips')
},
validate: {
trigger: ['input', 'blur'],
required: true,
message: t('project.node.dvc_empty_tips')
}
},
{
type: 'input',
field: 'dvcDataLocation',
name: t('project.node.dvc_data_location'),
props: { placeholder: 'dvcDataLocation' },
span: dvcDataLocationSpan,
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if ((model.isUpload || model.isDownload) && !value) {
return new Error(t('project.node.dvc_empty_tips'))
}
}
}
},
{
type: 'input',
field: 'dvcLoadSaveDataPath',
name: t('project.node.dvc_load_save_data_path'),
span: dvcLoadSaveDataPathSpan,
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if ((model.isUpload || model.isDownload) && !value) {
return new Error(t('project.node.dvc_empty_tips'))
}
}
}
},
{
type: 'input',
field: 'dvcVersion',
name: t('project.node.dvc_version'),
span: dvcVersionSpan,
props: {
placeholder: t('project.node.dvc_version_tips')
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if ((model.isUpload || model.isDownload) && !value) {
return new Error(t('project.node.dvc_empty_tips'))
}
}
}
},
{
type: 'input',
field: 'dvcMessage',
name: t('project.node.dvc_message'),
span: dvcMessageSpan,
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (model.isUpload && !value) {
return new Error(t('project.node.dvc_empty_tips'))
}
}
}
},
{
type: 'input',
field: 'dvcStoreUrl',
name: t('project.node.dvc_store_url'),
span: dvcStoreUrlSpan,
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!model.isInit && value) {
return new Error(t('project.node.dvc_empty_tips'))
}
}
}
}
]
}

11
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -359,6 +359,17 @@ export function formatParams(data: INodeData): {
taskParams.memoryLimit = data.memoryLimit taskParams.memoryLimit = data.memoryLimit
} }
if (data.taskType === 'DVC') {
taskParams.dvcTaskType = data.dvcTaskType
taskParams.dvcRepository = data.dvcRepository
taskParams.dvcVersion = data.dvcVersion
taskParams.dvcDataLocation = data.dvcDataLocation
taskParams.dvcMessage = data.dvcMessage
taskParams.dvcLoadSaveDataPath = data.dvcLoadSaveDataPath
taskParams.dvcStoreUrl = data.dvcStoreUrl
}
if (data.taskType === 'OPENMLDB') { if (data.taskType === 'OPENMLDB') {
taskParams.zk = data.zk taskParams.zk = data.zk
taskParams.zkPath = data.zkPath taskParams.zkPath = data.zkPath

4
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts

@ -38,6 +38,7 @@ import { useK8s } from './use-k8s'
import { useJupyter } from './use-jupyter' import { useJupyter } from './use-jupyter'
import { useMlflow } from './use-mlflow' import { useMlflow } from './use-mlflow'
import { useOpenmldb } from './use-openmldb' import { useOpenmldb } from './use-openmldb'
import { useDvc } from './use-dvc'
export default { export default {
SHELL: useShell, SHELL: useShell,
@ -62,5 +63,6 @@ export default {
K8S: useK8s, K8S: useK8s,
JUPYTER: useJupyter, JUPYTER: useJupyter,
MLFLOW: useMlflow, MLFLOW: useMlflow,
OPENMLDB: useOpenmldb OPENMLDB: useOpenmldb,
DVC: useDvc
} }

82
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData, ITaskData } from '../types'
export function useDvc({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const model = reactive({
name: '',
taskType: 'MLFLOW',
flag: 'YES',
description: '',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
delayTime: 0,
timeout: 30,
timeoutNotifyStrategy: ['WARN'],
dvcTaskType: 'Upload',
} as INodeData)
let extra: IJsonItem[] = []
if (from === 1) {
extra = [
Fields.useTaskType(model, readonly),
Fields.useProcessName({
model,
projectCode,
isCreate: !data?.id,
from,
processName: data?.processName
})
]
}
return {
json: [
Fields.useName(from),
...extra,
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useDvc(model),
Fields.usePreTasks()
] as IJsonItem[],
model
}
}

7
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -341,6 +341,13 @@ interface ITaskParams {
zk?: string zk?: string
zkPath?: string zkPath?: string
executeMode?: string executeMode?: string
dvcTaskType?: string
dvcRepository?: string
dvcVersion?: string
dvcDataLocation?: string
dvcMessage?: string
dvcLoadSaveDataPath?: string
dvcStoreUrl?: string
} }
interface INodeData interface INodeData

5
dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts

@ -38,6 +38,7 @@ export type TaskType =
| 'JUPYTER' | 'JUPYTER'
| 'MLFLOW' | 'MLFLOW'
| 'OPENMLDB' | 'OPENMLDB'
| 'DVC'
export const TASK_TYPES_MAP = { export const TASK_TYPES_MAP = {
SHELL: { SHELL: {
@ -118,5 +119,9 @@ export const TASK_TYPES_MAP = {
OPENMLDB: { OPENMLDB: {
alias: 'OPENMLDB', alias: 'OPENMLDB',
helperLinkDisable: true helperLinkDisable: true
},
DVC: {
alias: 'DVC',
helperLinkDisable: true
} }
} as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } } } as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } }

6
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

@ -170,6 +170,9 @@ $bgLight: #ffffff;
&.icon-openmldb { &.icon-openmldb {
background-image: url('/images/task-icons/openmldb.png'); background-image: url('/images/task-icons/openmldb.png');
} }
&.icon-dvc {
background-image: url('/images/task-icons/dvc.png');
}
} }
&:hover { &:hover {
@ -243,6 +246,9 @@ $bgLight: #ffffff;
&.icon-openmldb { &.icon-openmldb {
background-image: url('/images/task-icons/openmldb_hover.png'); background-image: url('/images/task-icons/openmldb_hover.png');
} }
&.icon-dvc {
background-image: url('/images/task-icons/dvc_hover.png');
}
} }
} }
} }

Loading…
Cancel
Save