diff --git a/docs/docs/en/guide/task/dvc.md b/docs/docs/en/guide/task/dvc.md new file mode 100644 index 0000000000..102295eb40 --- /dev/null +++ b/docs/docs/en/guide/task/dvc.md @@ -0,0 +1,125 @@ +# DVC Node + +## Overview + +[DVC (Data Version Control)](https://dvc.org) is an excellent open-source version control system for machine learning projects. + +The DVC plugin is used to use the data version management function of DVC on DolphinScheduler, helping users to carry out data version management easily. + +The plugin provides the following three functions: + +- Init DVC: Initialize the Git repository as a DVC repository and bind the address where the data is stored to store the actual data. +- Upload: Add or update specific data to the repository and record the version tag. +- Download: Download a specific version of data from the repository. + +## Create Task + +- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the "Create Workflow" button to enter the + DAG editing page. +- Drag from the toolbar task node to canvas. + +## Task Example + +First, introduce some general parameters of DolphinScheduler: + +- **Node name**: The node name in a workflow definition is unique. +- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select + the `prohibition execution`. +- **Descriptive information**: Describe the function of the node. +- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high + to low, and tasks with the same priority will execute in a first-in first-out order. +- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected, + randomly select a worker machine for execution. +- **Environment Name**: Configure the environment name in which run the script. +- **Times of failed retry attempts**: The number of times the task failed to resubmit. +- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task. +- **Delayed execution time**: The time (unit minute) that a task delays in execution. +- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm + email will send and the task execution will fail. +- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as + upstream of the current task. + +Here are some specific parameters for the DVC plugin: + +- **DVC Task Type** :Upload, Download or Init DVC。 +- **DVC Repository** :The DVC repository address associated with the task execution. + +### Init DVC + +Initialize the Git repository as a DVC repository and add a new data remote to save data. + +After the project is initialized, it is still a Git repository, but with DVC features added. + +The data is not actually stored in a Git repository, but somewhere else, and DVC keeps track of the version and address of the data and handles this relationship. + +![dvc_init](../../../../img/tasks/demo/dvc_init.png) + +**Task Parameter** + +- **Remote Store Url** :The actual data is stored at the address. You can learn about the supported storage types from the [DVC supported storage types](https://dvc.org/doc/command-reference/remote/add#supported-storage-types) . + +The example above shows that: +Initialize repository `git@github.com:/dvc-data-repository-example.git` as a DVC project and bind the remote storage address to `~/dvc` + +### Upload + +Used to upload and update data and record version numbers. + +![dvc_upload](../../../../img/tasks/demo/dvc_upload.png) + +**Task Parameter** + +- **Data Path in DVC Repository** :The data will be uploaded to this path in the repository. +- **Data Path In Worker** :Data path to be uploaded. +- **Version** :After the data is uploaded, the version tag for the data will be saved in `git tag`. +- **Version Message** :Version Message. + +The example above shows that: + +Upload data `/home/data/iris` to the root directory of repository `git@github.com:/dvc-data-repository-example.git`. The file or folder of data is named `iris`. + +Then run `git tag "iris_1.0" -m "init iris data"`. Record the version tag `iris_1.0` and the version message `inir iris data`. + +### Download + +Used to download data for a specific version. + +![dvc_download](../../../../img/tasks/demo/dvc_download.png) + +**Task Parameter** + +- **Data Path in DVC Repository** :The path to the data to download in the DVC repository. +- **Data Path In Worker** :Path for saving data after the file is downloaded to the local. +- **Version** :The version of the data to download. + +The example above shows that: + +Download the data for iris data at version `iris_1.0` in repository `git@github.com:/dvc-data-repository-example.git` to the `~/dvc_test/iris` + +## Environment to prepare + +### Install DVC + +Make sure you have installed DVC, if not, you can run `pip install dvc` command to install it. + +Get the 'dvc' path and configure the environment variables. + +The conda environment is used as an example: + +Install python PIP on Conda and configure conda's environment variables so that the component can correctly find the 'DVC' command + +```shell +which dvc +# >> ~/anaconda3/bin/dvc +``` + +You need to enter the admin account to configure a conda environment variable(Please +install [anaconda](https://docs.continuum.io/anaconda/install/) +or [miniconda](https://docs.conda.io/en/latest/miniconda.html#installing ) in advance). + +![dvc_env_config](../../../../img/tasks/demo/dvc_env_config.png) + +Note During the configuration task, select the conda environment created above. Otherwise, the program cannot find the +Conda environment. + +![dvc_env_name](../../../../img/tasks/demo/dvc_env_name.png) \ No newline at end of file diff --git a/docs/docs/zh/guide/task/dvc.md b/docs/docs/zh/guide/task/dvc.md new file mode 100644 index 0000000000..3f9ff91907 --- /dev/null +++ b/docs/docs/zh/guide/task/dvc.md @@ -0,0 +1,110 @@ +# DVC节点 + +## 综述 + +[DVC(Data Version Control)](https://dvc.org) 是一个MLops领域一个优秀的开机器学习版本管理系统。 + +DVC 组件用于在DS上使用DVC的数据版本管理功能,帮助用户简易地进行数据的版本管理。组件提供如下三个功能: + +- Init DVC: 将git仓库初始化为DVC仓库,并绑定存储数据的地址用于存储实际的数据。 +- Upload: 将特定数据添加或者更新到仓库中,并记录版本号。 +- Download: 从仓库中下载特定版本的数据。 + +## 创建任务 + +- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面; +- 拖动工具栏的 任务节点到画板中。 + +## 任务样例 + +首先介绍一些DS通用参数 + +- **节点名称** :设置任务的名称。一个工作流定义中的节点名称是唯一的。 +- **运行标志** :标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。 +- **描述** :描述该节点的功能。 +- **任务优先级** :worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。 +- **Worker 分组** :任务分配给 worker 组的机器执行,选择 Default,会随机选择一台 worker 机执行。 +- **环境名称** :配置运行脚本的环境。 +- **失败重试次数** :任务失败重新提交的次数。 +- **失败重试间隔** :任务失败重新提交任务的时间间隔,以分钟为单位。 +- **延迟执行时间** :任务延迟执行的时间,以分钟为单位。 +- **超时告警** :勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。 +- **前置任务** :选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。 + +以下是一些DVC 组件的常用参数 + +- **DVC任务类型** :可以选择 Upload、Download、Init DVC。 +- **DVC仓库** :任务执行时关联的仓库地址。 + +### Init DVC + + +将git仓库初始化为DVC仓库, 并绑定数据储存的地方。 + +项目初始化后,仍然为git仓库,不过添加了DVC的特性。 + +实际上数据并不保存在git仓库,而是存储在另外的地方,DVC会跟踪数据的版本和地址,并处理好这个关系。 + +![dvc_init](../../../../img/tasks/demo/dvc_init.png) + +**任务参数** + +- **数据存储地址** + :实际的数据保存的地址,支持的类型可见 [DVC supported storage types](https://dvc.org/doc/command-reference/remote/add#supported-storage-types) + 。 + +如上述例子表示: 将仓库 `git@github.com:/dvc-data-repository-example.git` 初始化为DVC项目,并绑定远程储存地址为 `~/dvc` + +### Upload + +用于上传和更新数据,并记录版本号。 + +![dvc_upload](../../../../img/tasks/demo/dvc_upload.png) + +**任务参数** + +- **DVC仓库中的数据路径** :上传的数据保存到仓库的地址。 +- **Worker中数据路径** :需要上传的数据的地址。 +- **数据版本** :上传数据后,为该版本数据打上的版本号,会保存到 git tag 里面。 +- **数据版本信息** :本次上传需要备注的信息。 + +如上述例子表示: 将数据 `/home/data/iris` 上传到仓库 `git@github.com:/dvc-data-repository-example.git` +的根目录下,数据的文件/文件夹名字为`iris`。 然后执行 `git tag "iris_1.0" -m "init iris data"`。 记录版本号 `iris_1.0`和 版本信息 'inir iris data' + +### Download + +用于下载特定版本的数据。 + +![dvc_download](../../../../img/tasks/demo/dvc_download.png) + +**任务参数** + +- **DVC仓库中的数据路径** :需要下载数据在仓库中的路径。 +- **Worker中数据路径** :数据下载到本地后的保存地址。 +- **数据版本** :需要下载的数据的版本。 + +如上述例子表示: 将仓库 `git@github.com:xxxx/dvc-data-repository-example.git` 版本为 `iris_1.0` 的 iris 的数据下载到 `~/dvc_test/iris` + +## 环境准备 + +### dvc 安装 + +确保你已经安装DVC可以使用`pip install dvc`进行安装。 + +获取dvc地址, 并配置环境变量 + +下面以 conda 上的 python pip 安装为例子,配置 conda 的环境变量,使得组件能正确找到`dvc`命令 + +```shell +which dvc +# >> ~/anaconda3/bin/dvc +``` + +你需要进入admin账户配置一个conda环境变量(请提前[安装anaconda](https://docs.continuum.io/anaconda/install/) +或者[安装miniconda](https://docs.conda.io/en/latest/miniconda.html#installing) )。 + +![dvc_env_config](../../../../img/tasks/demo/dvc_env_config.png) + +后续注意配置任务时,环境选择上面创建的conda环境,否则程序会找不到conda环境。 + +![dvc_env_name](../../../../img/tasks/demo/dvc_env_name.png) diff --git a/docs/img/tasks/demo/dvc_download.png b/docs/img/tasks/demo/dvc_download.png new file mode 100644 index 0000000000..b6ed9edd61 Binary files /dev/null and b/docs/img/tasks/demo/dvc_download.png differ diff --git a/docs/img/tasks/demo/dvc_env_config.png b/docs/img/tasks/demo/dvc_env_config.png new file mode 100644 index 0000000000..3bbfe44cec Binary files /dev/null and b/docs/img/tasks/demo/dvc_env_config.png differ diff --git a/docs/img/tasks/demo/dvc_env_name.png b/docs/img/tasks/demo/dvc_env_name.png new file mode 100644 index 0000000000..17973ab621 Binary files /dev/null and b/docs/img/tasks/demo/dvc_env_name.png differ diff --git a/docs/img/tasks/demo/dvc_init.png b/docs/img/tasks/demo/dvc_init.png new file mode 100644 index 0000000000..7da5ada16e Binary files /dev/null and b/docs/img/tasks/demo/dvc_init.png differ diff --git a/docs/img/tasks/demo/dvc_upload.png b/docs/img/tasks/demo/dvc_upload.png new file mode 100644 index 0000000000..274c1a813a Binary files /dev/null and b/docs/img/tasks/demo/dvc_upload.png differ diff --git a/docs/img/tasks/icons/dvc.png b/docs/img/tasks/icons/dvc.png new file mode 100644 index 0000000000..404640c862 Binary files /dev/null and b/docs/img/tasks/icons/dvc.png differ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml index 2422a4628e..7a9d8d91ae 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml @@ -171,6 +171,12 @@ dolphinscheduler-task-openmldb ${project.version} + + + org.apache.dolphinscheduler + dolphinscheduler-task-dvc + ${project.version} + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/pom.xml new file mode 100644 index 0000000000..a0435647ca --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/pom.xml @@ -0,0 +1,46 @@ + + + + + dolphinscheduler-task-plugin + org.apache.dolphinscheduler + dev-SNAPSHOT + + 4.0.0 + + dolphinscheduler-task-dvc + jar + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + + + org.apache.commons + commons-collections4 + + + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java new file mode 100644 index 0000000000..b15d6d450c --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dvc; + +public class DvcConstants { + private DvcConstants() { + throw new IllegalStateException("Utility class"); + } + + public static final String CHECK_AND_SET_DVC_REPO = "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=%s"; + + public static final String SET_DATA_PATH = "DVC_DATA_PATH=%s"; + + public static final String SET_DATA_LOCATION = "DVC_DATA_LOCATION=%s"; + + public static final String SET_VERSION = "DVC_VERSION=%s"; + + public static final String SET_MESSAGE = "DVC_MESSAGE=\"%s\""; + + public static final String GIT_CLONE_DVC_REPO = "git clone $DVC_REPO dvc-repository; cd dvc-repository; pwd"; + + public static final String DVC_AUTOSTAGE = "dvc config core.autostage true --local || exit 1"; + + public static final String DVC_ADD_DATA = "dvc add $DVC_DATA_PATH -v -o $DVC_DATA_LOCATION --to-remote || exit 1"; + + public static final String GIT_UPDATE_FOR_UPDATE_DATA = "git commit -am \"$DVC_MESSAGE\"\n" + + "git tag \"$DVC_VERSION\" -m \"$DVC_MESSAGE\"\n" + + "git push --all\n" + + "git push --tags"; + + public static final String DVC_DOWNLOAD = "dvc get $DVC_REPO $DVC_DATA_LOCATION -o $DVC_DATA_PATH -v --rev $DVC_VERSION"; + + + public static final String DVC_INIT = "dvc init || exit 1"; + + public static final String DVC_ADD_REMOTE = "dvc remote add origin %s -d"; + + public static final String GIT_UPDATE_FOR_INIT_DVC = "git commit -am \"init dvc project and add remote\"; git push"; + +} + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java new file mode 100644 index 0000000000..97c404cb29 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dvc; + +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; + +public class DvcParameters extends AbstractParameters { + + /** + * common parameters + */ + + private TaskTypeEnum dvcTaskType; + + private String dvcRepository; + + private String dvcVersion; + + private String dvcDataLocation; + + private String dvcMessage; + + private String dvcLoadSaveDataPath; + + private String dvcStoreUrl; + + public void setDvcTaskType(TaskTypeEnum dvcTaskType) { + this.dvcTaskType = dvcTaskType; + } + + public TaskTypeEnum getDvcTaskType() { + return dvcTaskType; + } + + public void setDvcRepository(String dvcRepository) { + this.dvcRepository = dvcRepository; + } + + public String getDvcRepository() { + return dvcRepository; + } + + public void setDvcVersion(String dvcVersion) { + this.dvcVersion = dvcVersion; + } + + public String getDvcVersion() { + return dvcVersion; + } + + public void setDvcDataLocation(String dvcDataLocation) { + this.dvcDataLocation = dvcDataLocation; + } + + public String getDvcDataLocation() { + return dvcDataLocation; + } + + public void setDvcMessage(String dvcMessage) { + this.dvcMessage = dvcMessage; + } + + public String getDvcMessage() { + return dvcMessage; + } + + public void setDvcLoadSaveDataPath(String dvcLoadSaveDataPath) { + this.dvcLoadSaveDataPath = dvcLoadSaveDataPath; + } + + public String getDvcLoadSaveDataPath() { + return dvcLoadSaveDataPath; + } + + public void setDvcStoreUrl(String dvcStoreUrl) { + this.dvcStoreUrl = dvcStoreUrl; + } + + public String getDvcStoreUrl() { + return dvcStoreUrl; + } + + @Override + public boolean checkParameters() { + Boolean checkResult = true; + return checkResult; + } + +} + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java new file mode 100644 index 0000000000..24bd015dc9 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dvc; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; + +import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; +import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * shell task + */ +public class DvcTask extends AbstractTaskExecutor { + + /** + * dvc parameters + */ + private DvcParameters parameters; + + /** + * shell command executor + */ + private ShellCommandExecutor shellCommandExecutor; + + /** + * taskExecutionContext + */ + private TaskExecutionContext taskExecutionContext; + + /** + * constructor + * + * @param taskExecutionContext taskExecutionContext + */ + public DvcTask(TaskExecutionContext taskExecutionContext) { + super(taskExecutionContext); + + this.taskExecutionContext = taskExecutionContext; + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger); + } + + @Override + public void init() { + logger.info("dvc task params {}", taskExecutionContext.getTaskParams()); + + parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DvcParameters.class); + + if (!parameters.checkParameters()) { + throw new RuntimeException("dvc task params is not valid"); + } + } + + @Override + public void handle() throws Exception { + try { + // construct process + String command = buildCommand(); + TaskResponse commandExecuteResult = shellCommandExecutor.run(command); + setExitStatusCode(commandExecuteResult.getExitStatusCode()); + setAppIds(commandExecuteResult.getAppIds()); + setProcessId(commandExecuteResult.getProcessId()); + parameters.dealOutParam(shellCommandExecutor.getVarPool()); + } catch (Exception e) { + logger.error("dvc task error", e); + setExitStatusCode(EXIT_CODE_FAILURE); + throw e; + } + } + + @Override + public void cancelApplication(boolean cancelApplication) throws Exception { + // cancel process + shellCommandExecutor.cancelApplication(); + } + + public String buildCommand() { + String command = ""; + TaskTypeEnum taskType = parameters.getDvcTaskType(); + if (taskType == TaskTypeEnum.UPLOAD) { + command = buildUploadCommond(); + }else if (taskType == TaskTypeEnum.DOWNLOAD){ + command = buildDownCommond(); + }else if (taskType == TaskTypeEnum.INIT){ + command = buildInitDvcCommond(); + } + logger.info("Run DVC task with command: \n{}", command); + return command; + } + + private String buildUploadCommond() { + List args = new ArrayList<>(); + args.add(String.format(DvcConstants.CHECK_AND_SET_DVC_REPO, parameters.getDvcRepository())); + args.add(String.format(DvcConstants.SET_DATA_PATH, parameters.getDvcLoadSaveDataPath())); + args.add(String.format(DvcConstants.SET_DATA_LOCATION, parameters.getDvcDataLocation())); + args.add(String.format(DvcConstants.SET_VERSION, parameters.getDvcVersion())); + args.add(String.format(DvcConstants.SET_MESSAGE, parameters.getDvcMessage())); + args.add(DvcConstants.GIT_CLONE_DVC_REPO); + args.add(DvcConstants.DVC_AUTOSTAGE); + args.add(DvcConstants.DVC_ADD_DATA); + args.add(DvcConstants.GIT_UPDATE_FOR_UPDATE_DATA); + + String command = String.join("\n", args); + return command; + + } + + private String buildDownCommond() { + List args = new ArrayList<>(); + args.add(String.format(DvcConstants.CHECK_AND_SET_DVC_REPO, parameters.getDvcRepository())); + args.add(String.format(DvcConstants.SET_DATA_PATH, parameters.getDvcLoadSaveDataPath())); + args.add(String.format(DvcConstants.SET_DATA_LOCATION, parameters.getDvcDataLocation())); + args.add(String.format(DvcConstants.SET_VERSION, parameters.getDvcVersion())); + args.add(DvcConstants.DVC_DOWNLOAD); + + String command = String.join("\n", args); + return command; + + } + + private String buildInitDvcCommond() { + List args = new ArrayList<>(); + args.add(String.format(DvcConstants.CHECK_AND_SET_DVC_REPO, parameters.getDvcRepository())); + args.add(DvcConstants.GIT_CLONE_DVC_REPO); + args.add(DvcConstants.DVC_INIT); + args.add(String.format(DvcConstants.DVC_ADD_REMOTE, parameters.getDvcStoreUrl())); + args.add(DvcConstants.GIT_UPDATE_FOR_INIT_DVC); + + String command = String.join("\n", args); + return command; + + } + + + @Override + public AbstractParameters getParameters() { + return parameters; + } + + +} + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannel.java new file mode 100644 index 0000000000..adccccede7 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannel.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dvc; + +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; + + +public class DvcTaskChannel implements TaskChannel { + + @Override + public void cancelApplication(boolean status) { + + } + + @Override + public DvcTask createTask(TaskExecutionContext taskRequest) { + return new DvcTask(taskRequest); + } + + @Override + public AbstractParameters parseParameters(ParametersNode parametersNode) { + return JSONUtils.parseObject(parametersNode.getTaskParams(), DvcParameters.class); + } + + @Override + public ResourceParametersHelper getResources(String parameters) { + return null; + } + +} + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannelFactory.java new file mode 100644 index 0000000000..f028d4e0d6 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannelFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dvc; + +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; +import org.apache.dolphinscheduler.spi.params.base.ParamsOptions; +import org.apache.dolphinscheduler.spi.params.base.PluginParams; +import org.apache.dolphinscheduler.spi.params.base.Validate; +import org.apache.dolphinscheduler.spi.params.input.InputParam; +import org.apache.dolphinscheduler.spi.params.radio.RadioParam; + +import java.util.ArrayList; +import java.util.List; + +import com.google.auto.service.AutoService; + +@AutoService(TaskChannelFactory.class) +public class DvcTaskChannelFactory implements TaskChannelFactory { + @Override + public TaskChannel create() { + return new DvcTaskChannel(); + } + + @Override + public String getName() { + return "DVC"; + } + + @Override + public List getParams() { + List paramsList = new ArrayList<>(); + + InputParam nodeName = InputParam.newBuilder("name", "$t('Node name')").addValidate(Validate.newBuilder().setRequired(true).build()).build(); + + RadioParam runFlag = RadioParam.newBuilder("runFlag", "RUN_FLAG").addParamsOptions(new ParamsOptions("NORMAL", "NORMAL", false)).addParamsOptions(new ParamsOptions("FORBIDDEN", "FORBIDDEN", false)).build(); + + paramsList.add(nodeName); + paramsList.add(runFlag); + return paramsList; + } +} + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java new file mode 100644 index 0000000000..4cdce3eb23 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dvc; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public enum TaskTypeEnum { + + @JsonProperty("Upload") + UPLOAD, + @JsonProperty("Download") + DOWNLOAD, + @JsonProperty("Init DVC") + INIT +} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java new file mode 100644 index 0000000000..2b42c7f77d --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dvc; + +import java.util.Date; +import java.util.UUID; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.dolphinscheduler.spi.utils.PropertyUtils; + +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.junit.runner.RunWith; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + JSONUtils.class, + PropertyUtils.class, +}) +@PowerMockIgnore({"javax.*"}) +@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils") +public class DvcTaskTest { + + @Before + public void before() throws Exception { + PowerMockito.mockStatic(PropertyUtils.class); + } + + public TaskExecutionContext createContext(DvcParameters dvcParameters) { + String parameters = JSONUtils.toJsonString(dvcParameters); + TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); + Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters); + Mockito.when(taskExecutionContext.getTaskLogName()).thenReturn("DvcTest"); + Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp/dolphinscheduler_dvc_test"); + Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date()); + Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000); + Mockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dolphinscheduler_dvc_test/log"); + Mockito.when(taskExecutionContext.getEnvironmentConfig()).thenReturn("export PATH=$HOME/anaconda3/bin:$PATH"); + + String userName = System.getenv().get("USER"); + Mockito.when(taskExecutionContext.getTenantCode()).thenReturn(userName); + + TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + return taskExecutionContext; + } + + private DvcTask initTask(DvcParameters parameters) { + TaskExecutionContext taskExecutionContext = createContext(parameters); + DvcTask dvcTask = new DvcTask(taskExecutionContext); + dvcTask.init(); + dvcTask.getParameters().setVarPool(taskExecutionContext.getVarPool()); + return dvcTask; + + } + + @Test + public void testDvcUpload() throws Exception{ + DvcTask dvcTask = initTask(createUploadParameters()); + Assert.assertEquals(dvcTask.buildCommand(), + "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=git@github.com:/dvc-data-repository-example\n" + + "DVC_DATA_PATH=/home//test\n" + + "DVC_DATA_LOCATION=test\n" + + "DVC_VERSION=iris_v2.3.1\n" + + "DVC_MESSAGE=\"add test iris data\"\n" + + "git clone $DVC_REPO dvc-repository; cd dvc-repository; pwd\n" + + "dvc config core.autostage true --local || exit 1\n" + + "dvc add $DVC_DATA_PATH -v -o $DVC_DATA_LOCATION --to-remote || exit 1\n" + + "git commit -am \"$DVC_MESSAGE\"\n" + + "git tag \"$DVC_VERSION\" -m \"$DVC_MESSAGE\"\n" + + "git push --all\n" + + "git push --tags"); + + } + + @Test + public void testDvcDownload() throws Exception{ + DvcTask dvcTask = initTask(createDownloadParameters()); + Assert.assertEquals(dvcTask.buildCommand(), + "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=git@github.com:/dvc-data-repository-example\n" + + "DVC_DATA_PATH=data\n" + + "DVC_DATA_LOCATION=iris\n" + + "DVC_VERSION=iris_v2.3.1\n" + + "dvc get $DVC_REPO $DVC_DATA_LOCATION -o $DVC_DATA_PATH -v --rev $DVC_VERSION"); + } + + @Test + public void testInitDvc() throws Exception{ + DvcTask dvcTask = initTask(createInitDvcParameters()); + Assert.assertEquals(dvcTask.buildCommand(), + "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=git@github.com:/dvc-data-repository-example\n" + + "git clone $DVC_REPO dvc-repository; cd dvc-repository; pwd\n" + + "dvc init || exit 1\n" + + "dvc remote add origin ~/.dvc_test -d\n" + + "git commit -am \"init dvc project and add remote\"; git push"); + } + + private DvcParameters createUploadParameters() { + DvcParameters parameters = new DvcParameters(); + parameters.setDvcTaskType(TaskTypeEnum.UPLOAD); + parameters.setDvcRepository("git@github.com:/dvc-data-repository-example"); + parameters.setDvcLoadSaveDataPath("/home//test"); + parameters.setDvcDataLocation("test"); + parameters.setDvcVersion("iris_v2.3.1"); + parameters.setDvcMessage("add test iris data"); + return parameters; + } + + private DvcParameters createDownloadParameters() { + DvcParameters parameters = new DvcParameters(); + parameters.setDvcTaskType(TaskTypeEnum.DOWNLOAD); + parameters.setDvcRepository("git@github.com:/dvc-data-repository-example"); + parameters.setDvcLoadSaveDataPath("data"); + parameters.setDvcDataLocation("iris"); + parameters.setDvcVersion("iris_v2.3.1"); + return parameters; + } + + private DvcParameters createInitDvcParameters() { + DvcParameters parameters = new DvcParameters(); + parameters.setDvcTaskType(TaskTypeEnum.INIT); + parameters.setDvcRepository("git@github.com:/dvc-data-repository-example"); + parameters.setDvcStoreUrl("~/.dvc_test"); + return parameters; + } +} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml index 43f0209e0f..f40e882d47 100644 --- a/dolphinscheduler-task-plugin/pom.xml +++ b/dolphinscheduler-task-plugin/pom.xml @@ -55,5 +55,6 @@ dolphinscheduler-task-jupyter dolphinscheduler-task-mlflow dolphinscheduler-task-openmldb + dolphinscheduler-task-dvc - + \ No newline at end of file diff --git a/dolphinscheduler-ui/public/images/task-icons/dvc.png b/dolphinscheduler-ui/public/images/task-icons/dvc.png new file mode 100644 index 0000000000..404640c862 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/dvc.png differ diff --git a/dolphinscheduler-ui/public/images/task-icons/dvc_hover.png b/dolphinscheduler-ui/public/images/task-icons/dvc_hover.png new file mode 100644 index 0000000000..d114743521 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/dvc_hover.png differ diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index df289e5da2..93e643690f 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -676,6 +676,16 @@ export default { openmldb_execute_mode_tips: 'Please select the execute mode', openmldb_execute_mode_offline: 'offline', openmldb_execute_mode_online: 'online', + dvc_task_type: 'DVC Task Type', + dvc_repository: 'DVC Repository', + dvc_repository_tips: 'please input the url of dvc repository', + dvc_version: 'Version', + dvc_version_tips: 'data version, will be mark as git tag', + dvc_data_location: 'Data Path in DVC Repository', + dvc_message: 'Version Message', + dvc_load_save_data_path: 'Data Path In Worker', + dvc_store_url: 'Store Url', + dvc_empty_tips: 'This parameter cannot be empty', send_email: 'Send Email', log_display: 'Log display', rows_of_result: 'rows of result', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 51b0982d36..baed96432e 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -664,6 +664,16 @@ export default { openmldb_execute_mode_tips: '请选择执行模式', openmldb_execute_mode_offline: '离线', openmldb_execute_mode_online: '在线', + dvc_task_type: 'DVC任务类型', + dvc_repository: 'DVC仓库', + dvc_repository_tips: '请输入DVC仓库地址', + dvc_version: '数据版本', + dvc_version_tips: '数据版本标识,会以git tag的形式标记', + dvc_data_location: 'DVC仓库中的数据路径', + dvc_message: '提交信息', + dvc_load_save_data_path: 'Worker中数据路径', + dvc_store_url: '数据存储地址', + dvc_empty_tips: '该参数不能为空', send_email: '发送邮件', log_display: '日志显示', rows_of_result: '行查询结果', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts index 89cd27ba83..971fd0447b 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts @@ -70,3 +70,4 @@ export { useMlflow } from './use-mlflow' export { useMlflowProjects } from './use-mlflow-projects' export { useMlflowModels } from './use-mlflow-models' export { useOpenmldb } from './use-openmldb' +export { useDvc } from './use-dvc' diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dvc.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dvc.ts new file mode 100644 index 0000000000..0ceac153c3 --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dvc.ts @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { useI18n } from 'vue-i18n' +import type { IJsonItem } from '../types' +import { watch, ref } from 'vue' + +export const DVC_TASK_TYPE = [ + { + label: 'Upload', + value: 'Upload' + }, + { + label: 'Download', + value: 'Download' + }, + { + label: 'Init DVC', + value: 'Init DVC' + } +] + +export function useDvc(model: { [field: string]: any }): IJsonItem[] { + const { t } = useI18n() + + const dvcLoadSaveDataPathSpan = ref(0) + const dvcDataLocationSpan = ref(0) + const dvcVersionSpan = ref(0) + const dvcMessageSpan = ref(0) + const dvcStoreUrlSpan = ref(0) + + const setFlag = () => { + model.isUpload = model.dvcTaskType === 'Upload' + model.isDownload = model.dvcTaskType === 'Download' + model.isInit = model.dvcTaskType === 'Init DVC' + } + + const resetData = () => { + dvcLoadSaveDataPathSpan.value = model.isUpload || model.isDownload ? 24 : 0 + dvcDataLocationSpan.value = model.isUpload || model.isDownload ? 24 : 0 + dvcVersionSpan.value = model.isUpload || model.isDownload ? 24 : 0 + dvcMessageSpan.value = model.isUpload ? 24 : 0 + dvcStoreUrlSpan.value = model.isInit ? 24 : 0 + } + + watch( + () => [model.dvcTaskType], + () => { + setFlag() + resetData() + } + ) + setFlag() + resetData() + + return [ + { + type: 'select', + field: 'dvcTaskType', + name: t('project.node.dvc_task_type'), + span: 12, + options: DVC_TASK_TYPE + }, + { + type: 'input', + field: 'dvcRepository', + name: t('project.node.dvc_repository'), + span: 24, + props: { + placeholder: t('project.node.dvc_repository_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: true, + message: t('project.node.dvc_empty_tips') + } + }, + { + type: 'input', + field: 'dvcDataLocation', + name: t('project.node.dvc_data_location'), + props: { placeholder: 'dvcDataLocation' }, + span: dvcDataLocationSpan, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if ((model.isUpload || model.isDownload) && !value) { + return new Error(t('project.node.dvc_empty_tips')) + } + } + } + }, + { + type: 'input', + field: 'dvcLoadSaveDataPath', + name: t('project.node.dvc_load_save_data_path'), + span: dvcLoadSaveDataPathSpan, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if ((model.isUpload || model.isDownload) && !value) { + return new Error(t('project.node.dvc_empty_tips')) + } + } + } + }, + { + type: 'input', + field: 'dvcVersion', + name: t('project.node.dvc_version'), + span: dvcVersionSpan, + props: { + placeholder: t('project.node.dvc_version_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if ((model.isUpload || model.isDownload) && !value) { + return new Error(t('project.node.dvc_empty_tips')) + } + } + } + }, + { + type: 'input', + field: 'dvcMessage', + name: t('project.node.dvc_message'), + span: dvcMessageSpan, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if (model.isUpload && !value) { + return new Error(t('project.node.dvc_empty_tips')) + } + } + } + }, + { + type: 'input', + field: 'dvcStoreUrl', + name: t('project.node.dvc_store_url'), + span: dvcStoreUrlSpan, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if (!model.isInit && value) { + return new Error(t('project.node.dvc_empty_tips')) + } + } + } + } + ] +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 5cf480532a..ae5a2efbae 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -359,6 +359,17 @@ export function formatParams(data: INodeData): { taskParams.memoryLimit = data.memoryLimit } + if (data.taskType === 'DVC') { + + taskParams.dvcTaskType = data.dvcTaskType + taskParams.dvcRepository = data.dvcRepository + taskParams.dvcVersion = data.dvcVersion + taskParams.dvcDataLocation = data.dvcDataLocation + taskParams.dvcMessage = data.dvcMessage + taskParams.dvcLoadSaveDataPath = data.dvcLoadSaveDataPath + taskParams.dvcStoreUrl = data.dvcStoreUrl + } + if (data.taskType === 'OPENMLDB') { taskParams.zk = data.zk taskParams.zkPath = data.zkPath diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts index a151b70d70..d2a5a6dd01 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts @@ -38,6 +38,7 @@ import { useK8s } from './use-k8s' import { useJupyter } from './use-jupyter' import { useMlflow } from './use-mlflow' import { useOpenmldb } from './use-openmldb' +import { useDvc } from './use-dvc' export default { SHELL: useShell, @@ -62,5 +63,6 @@ export default { K8S: useK8s, JUPYTER: useJupyter, MLFLOW: useMlflow, - OPENMLDB: useOpenmldb + OPENMLDB: useOpenmldb, + DVC: useDvc } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts new file mode 100644 index 0000000000..5838224e13 --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { reactive } from 'vue' +import * as Fields from '../fields/index' +import type { IJsonItem, INodeData, ITaskData } from '../types' + +export function useDvc({ + projectCode, + from = 0, + readonly, + data +}: { + projectCode: number + from?: number + readonly?: boolean + data?: ITaskData +}) { + const model = reactive({ + name: '', + taskType: 'MLFLOW', + flag: 'YES', + description: '', + timeoutFlag: false, + localParams: [], + environmentCode: null, + failRetryInterval: 1, + failRetryTimes: 0, + workerGroup: 'default', + delayTime: 0, + timeout: 30, + timeoutNotifyStrategy: ['WARN'], + dvcTaskType: 'Upload', + } as INodeData) + + let extra: IJsonItem[] = [] + if (from === 1) { + extra = [ + Fields.useTaskType(model, readonly), + Fields.useProcessName({ + model, + projectCode, + isCreate: !data?.id, + from, + processName: data?.processName + }) + ] + } + + return { + json: [ + Fields.useName(from), + ...extra, + Fields.useRunFlag(), + Fields.useDescription(), + Fields.useTaskPriority(), + Fields.useWorkerGroup(), + Fields.useEnvironmentName(model, !model.id), + ...Fields.useTaskGroup(model, projectCode), + ...Fields.useFailed(), + Fields.useDelayTime(model), + ...Fields.useTimeoutAlarm(model), + ...Fields.useDvc(model), + Fields.usePreTasks() + ] as IJsonItem[], + model + } +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 9535c76d37..7ceca70723 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -341,6 +341,13 @@ interface ITaskParams { zk?: string zkPath?: string executeMode?: string + dvcTaskType?: string + dvcRepository?: string + dvcVersion?: string + dvcDataLocation?: string + dvcMessage?: string + dvcLoadSaveDataPath?: string + dvcStoreUrl?: string } interface INodeData diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts index 26cd32f544..ebd3f741e1 100644 --- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts +++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts @@ -38,6 +38,7 @@ export type TaskType = | 'JUPYTER' | 'MLFLOW' | 'OPENMLDB' + | 'DVC' export const TASK_TYPES_MAP = { SHELL: { @@ -118,5 +119,9 @@ export const TASK_TYPES_MAP = { OPENMLDB: { alias: 'OPENMLDB', helperLinkDisable: true + }, + DVC: { + alias: 'DVC', + helperLinkDisable: true } } as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss index bbe539d813..656e5ae890 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss @@ -170,6 +170,9 @@ $bgLight: #ffffff; &.icon-openmldb { background-image: url('/images/task-icons/openmldb.png'); } + &.icon-dvc { + background-image: url('/images/task-icons/dvc.png'); + } } &:hover { @@ -243,6 +246,9 @@ $bgLight: #ffffff; &.icon-openmldb { background-image: url('/images/task-icons/openmldb_hover.png'); } + &.icon-dvc { + background-image: url('/images/task-icons/dvc_hover.png'); + } } } }