Browse Source

[Feature][Task] Transfer files between tasks (#12552)

* add data transfer between tasks

* add delete DATA_TRANSFER API

* convert Result to DeleteDataTransferResponse

* add api UT

* fix final

* fix doc
3.2.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
a47b4d0672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      docs/configs/docsdev.js
  2. 102
      docs/docs/en/guide/parameter/file-parameter.md
  3. 101
      docs/docs/zh/guide/parameter/file-parameter.md
  4. BIN
      docs/img/new_ui/dev/parameter/file_parameter_input_dir.png
  5. BIN
      docs/img/new_ui/dev/parameter/file_parameter_input_file.png
  6. BIN
      docs/img/new_ui/dev/parameter/file_parameter_output.png
  7. 20
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
  8. 33
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/DeleteDataTransferResponse.java
  9. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  10. 62
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  11. 48
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  12. 8
      dolphinscheduler-bom/pom.xml
  13. 1
      dolphinscheduler-dist/release-docs/LICENSE
  14. 202
      dolphinscheduler-dist/release-docs/licenses/LICENSE-zt-zip.txt
  15. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DataType.java
  16. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-custom-params.ts
  17. 5
      dolphinscheduler-worker/pom.xml
  18. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
  19. 270
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
  20. 255
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java
  21. 1
      tools/dependencies/known-dependencies.txt

8
docs/configs/docsdev.js

@ -238,6 +238,10 @@ export default {
title: 'Parameter Priority', title: 'Parameter Priority',
link: '/en-us/docs/dev/user_doc/guide/parameter/priority.html', link: '/en-us/docs/dev/user_doc/guide/parameter/priority.html',
}, },
{
title: 'File Parameter',
link: '/en-us/docs/dev/user_doc/guide/parameter/file-parameter.html',
},
], ],
}, },
{ {
@ -890,6 +894,10 @@ export default {
title: '参数优先级', title: '参数优先级',
link: '/zh-cn/docs/dev/user_doc/guide/parameter/priority.html', link: '/zh-cn/docs/dev/user_doc/guide/parameter/priority.html',
}, },
{
title: '文件参数传递',
link: '/zh-cn/docs/dev/user_doc/guide/parameter/file-parameter.html',
},
], ],
}, },
{ {

102
docs/docs/en/guide/parameter/file-parameter.md

@ -0,0 +1,102 @@
# FILE Parameter
Use the file parameter to pass files (or folders, hereinafter referred to as **files**) in the working directory of the upstream task to the downstream task in the same workflow instance. The following scenarios may be used
- In the ETL task, pass the data files processed by multiple upstream tasks to a specific downstream task.
- In the machine learning scenario, pass the data set file of the upstream data preparation task to the downstream model training task.
## Usage
### Configure file parameter
File parameter configuration method: click the plus sign on the right side of "Custom Parameters" on the task definition page to configure.
### Output file to downstream task
**Four options of custom parameters are:**
- Parameter name: the identifier used when passing tasks, such as `KEY1` and `KEY2` in the figure below
- Direction: OUT, which means outputting the file to the downstream task
- Parameter type: FILE, indicating file parameter
- Parameter value: output file path, such as `data` and `data/test2/text.txt` in the figure below
The configuration in the figure below indicates that the `output` task passes two file data to the downstream task, respectively:
- Pass out the folder `data`, and mark it as `dir-data`. The downstream task can get this folder through `output.dir-data`
- Pass out the file `data/test2/text.txt`, and mark it as `file-text`. The downstream task can get this folder through `output.file-text`
![img.png](../../../../img/new_ui/dev/parameter/file_parameter_output.png)
### Get the file from the upstream task
**Four options of custom parameters are:**
- Parameter name: the position where the upstream file is saved after input, such as `input_dir` used in the figure below
- Direction: IN, which means to get the file from the upstream task
- Parameter type: FILE, indicating file parameter
- Parameter value: the identifier of the upstream file, in the format of `taskName.KEY`. For example, `output.dir-data` in the figure below, where `output` is the name of the upstream task, and `dir-data` is the file identifier output by the upstream task
The configuration in the figure below indicates that the task gets the folder identified by `dir-data` from the upstream task `output` and saves it as `input_dir`
![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_dir.png)
The configuration in the figure below indicates that the task gets the file identified by `file-text` from the upstream task `output` and saves it as `input.txt`
![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_file.png)
## Other
### Note
- The file transfer between upstream and downstream tasks is based on the resource center as a transfer, and the data is saved in the `DATA_TRANSFER` directory of the resource center. Therefore, **the resource center function must be enabled**, please refer to [Resource Center Configuration Details](../resource/configuration.md) for details, otherwise the file parameter function cannot be used.
- The file naming rule is `DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName`
- If the transferred file data is a folder, it will be packaged into a compressed file with a suffix of `.zip` and uploaded. The downstream task will unzip and save it in the corresponding directory after receiving it
- If you need to delete the file data, you can delete the corresponding folder in the `DATA_TRANSFER` directory of the resource center. If you delete the date subdirectory directly, all the file data under that date will be deleted. You can also use the [Open API interface](../open-api.md) (`resources/data-transfer`) to delete the corresponding file data (delete data N days ago).
- If there is a task chain task1->task2->tas3, then the downstream task task3 can also get the file data of task1
- Support one-to-many transmission and many-to-one transmission
- If you frequently transfer a large number of files, it is obvious that the system IO performance will be affected by the amount of transferred data
### Example
You can save the following YAML file locally and then execute `pydolphinscheduler yaml -f data-transfer.yaml` to run the Demo.
```yaml
# Define the workflow
workflow:
name: "data-transfer"
run: true
# Define the tasks under the workflow
tasks:
- name: output
task_type: Shell
command: |
mkdir -p data/test1 data/test2
echo "test1 message" >> data/test1/text.txt
echo "test2 message" >> data/test2/text.txt
tree .
local_params:
- { "prop": "dir-data", "direct": "OUT", "type": "FILE", "value": "data" }
- { "prop": "file-text", "direct": "OUT", "type": "FILE", "value": "data/test2/text.txt" }
- name: input_dir
task_type: Shell
deps: [output]
command: |
tree .
cat input_dir/test1/text.txt
cat input_dir/test2/text.txt
local_params:
- { "prop": "input_dir", "direct": "IN", "type": "FILE", "value": "output.dir-data" }
- name: input_file
task_type: Shell
deps: [output]
command: |
tree .
cat input.txt
local_params:
- { "prop": "input.txt", "direct": "IN", "type": "FILE", "value": "output.file-text" }
```

101
docs/docs/zh/guide/parameter/file-parameter.md

@ -0,0 +1,101 @@
# 文件参数
通过配置文件参数,在同一工作流实例中,可以将上游任务工作目录下的文件(或文件夹,下统一以**文件**代替)传递给下游任务。 如以下场景可能使用到
- 在ETL任务中,将多个上游任务处理好的数据文件一起传递给特定的下游任务。
- 在机器学习场景中,将上游数据准备任务的数据集文件传递给下游模型训练任务。
## 使用方式
### 配置文件参数
文件参数配置方式如下:在任务定义页面,点击“自定义参数”右边的加号,即可进行配置。
### 输出文件给下游任务
**自定义参数四个选项分别为:**
- 参数名:任务间传递时使用的标识,如下图中使用的`KEY1`和`KEY2`
- 方向:OUT, 则表示输出文件给下游任务
- 参数类型:FILE, 表示文件参数
- 参数值:输出的文件路径,如下图中的`data`和`data/test2/text.txt`
下图的配置表示任务`output`向下游任务传递两个文件数据,分别为:
- 传出文件夹 `data`, 并标记为`dir-data`, 下游任务可以通过`output.dir-data`获取该文件夹
- 传出文件 `data/test2/text.txt`, 并标记为`file-text`, 下游任务可以通过`output.file-text`获取该文件夹
![img.png](../../../../img/new_ui/dev/parameter/file_parameter_output.png)
### 获取上游任务的文件
**自定义参数四个选项分别为:**
- 参数名:上游文件输入后保存的位置,如下图中使用的`input_dir`
- 方向:IN, 则表示从上游任务获取文件
- 参数类型:FILE, 表示文件参数
- 参数值:上游文件的标识,为 `taskName.KEY` 的格式 如下图中的`output.dir-data`, 其中`output`为上游任务的名称,`dir-data`为上游任务中输出的文件标识
下图的配置表示任务从上游任务`output`中获取标识为`dir-data`的文件夹,并保存为`input_dir`
![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_dir.png)
下图的配置表示任务从上游任务`output`中获取标识为`file-text`的文件,并保存为`input.txt`
![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_file.png)
## 其他
### 备注
- 上下游任务间的文件传递基于资源中心作为中转,数据保存在资源中心`DATA_TRANSFER`的目录下, 因此**必须开启资源中心功能**,详情请参考[资源中心配置详情](../resource/configuration.md), 否则无法使用文件参数功能。
- 文件命名规则为 `DATA_TRANSFER/日期/工作流Code/工作流版本_工作流实例ID/任务名称_任务实例ID_文件名`
- 若传输的文件数据为文件夹,则会打包成后缀为`.zip`的压缩文件再上传,下游任务接到后会解压并保存在对应目录
- 若需要删除文件数据,可以在资源中心的`DATA_TRANSFER`目录下删除对应文件夹即可, 如直接按照日期子目录删除,会删除该日期下所有的文件数据. 也可以使用`resources/data-transfer`[Open API 接口](../open-api.md)(删除N天前的数据)删除对应文件数据。
- 如果存在任务链 task1->task2->tas3, 则最下游任务task3也能获取task1的文件数据
- 支持一对多传输以及多对一传输
- 如果频繁大量传输文件,毫无疑问会因传输的数据量影响到系统IO性能
### 样例
你可以保存以下YAML文件到本地,然后执行`pydolphinscheduler yaml -f data-transfer.yaml`即可运行Demo.
```yaml
# Define the workflow
workflow:
name: "data-transfer"
run: true
# Define the tasks under the workflow
tasks:
- name: output
task_type: Shell
command: |
mkdir -p data/test1 data/test2
echo "test1 message" >> data/test1/text.txt
echo "test2 message" >> data/test2/text.txt
tree .
local_params:
- { "prop": "dir-data", "direct": "OUT", "type": "FILE", "value": "data" }
- { "prop": "file-text", "direct": "OUT", "type": "FILE", "value": "data/test2/text.txt" }
- name: input_dir
task_type: Shell
deps: [output]
command: |
tree .
cat input_dir/test1/text.txt
cat input_dir/test2/text.txt
local_params:
- { "prop": "input_dir", "direct": "IN", "type": "FILE", "value": "output.dir-data" }
- name: input_file
task_type: Shell
deps: [output]
command: |
tree .
cat input.txt
local_params:
- { "prop": "input.txt", "direct": "IN", "type": "FILE", "value": "output.file-text" }
```

BIN
docs/img/new_ui/dev/parameter/file_parameter_input_dir.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 72 KiB

BIN
docs/img/new_ui/dev/parameter/file_parameter_input_file.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

BIN
docs/img/new_ui/dev/parameter/file_parameter_output.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 121 KiB

20
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java

@ -42,6 +42,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.VIEW_RESOURCE_FILE_ON
import static org.apache.dolphinscheduler.api.enums.Status.VIEW_UDF_FUNCTION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.VIEW_UDF_FUNCTION_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ResourcesService; import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.api.service.UdfFuncService; import org.apache.dolphinscheduler.api.service.UdfFuncService;
@ -270,6 +271,25 @@ public class ResourcesController extends BaseController {
return resourceService.delete(loginUser, fullName, tenantCode); return resourceService.delete(loginUser, fullName, tenantCode);
} }
/**
* delete DATA_TRANSFER data
*
* @param loginUser login user
* @return delete result code
*/
@Operation(summary = "deleteDataTransferData", description = "Delete the N days ago data of DATA_TRANSFER ")
@Parameters({
@Parameter(name = "days", description = "N days ago", required = true, schema = @Schema(implementation = Integer.class))
})
@DeleteMapping(value = "/data-transfer")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_RESOURCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public DeleteDataTransferResponse deleteDataTransferData(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "days") Integer days) {
return resourceService.deleteDataTransferData(loginUser, days);
}
/** /**
* verify resource by alias and type * verify resource by alias and type
* *

33
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/DeleteDataTransferResponse.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources;
import org.apache.dolphinscheduler.api.utils.Result;
import java.util.List;
import lombok.Data;
@Data
public class DeleteDataTransferResponse extends Result {
private List<String> successList;
private List<String> failedList;
}

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Resource;
@ -230,6 +231,14 @@ public interface ResourcesService {
*/ */
Resource queryResourcesFileInfo(String userName, String fullName); Resource queryResourcesFileInfo(String userName, String fullName);
/**
* delete DATA_TRANSFER data in resource center
*
* @param loginUser user who query resource
* @param days number of days
*/
DeleteDataTransferResponse deleteDataTransferData(User loginUser, Integer days);
/** /**
* unauthorized file * unauthorized file
* *

62
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -26,6 +26,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.constants.Constants.JAR; import static org.apache.dolphinscheduler.common.constants.Constants.JAR;
import static org.apache.dolphinscheduler.common.constants.Constants.PERIOD; import static org.apache.dolphinscheduler.common.constants.Constants.PERIOD;
import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter; import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter;
import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor; import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor;
@ -71,9 +72,12 @@ import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.rmi.ServerException; import java.rmi.ServerException;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -1871,6 +1875,64 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return (Resource) resourceResponse.getData(); return (Resource) resourceResponse.getData();
} }
@Override
public DeleteDataTransferResponse deleteDataTransferData(User loginUser, Integer days) {
DeleteDataTransferResponse result = new DeleteDataTransferResponse();
User user = userMapper.selectById(loginUser.getId());
if (user == null) {
logger.error("user {} not exists", loginUser.getId());
putMsg(result, Status.USER_NOT_EXIST, loginUser.getId());
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
logger.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String baseFolder = storageOperate.getResourceFileName(tenantCode, "DATA_TRANSFER");
LocalDateTime now = LocalDateTime.now();
now = now.minus(days, ChronoUnit.DAYS);
String deleteDate = now.toLocalDate().toString().replace("-", "");
List<StorageEntity> storageEntities;
try {
storageEntities = new ArrayList<>(
storageOperate.listFilesStatus(baseFolder, baseFolder, tenantCode, ResourceType.FILE));
} catch (Exception e) {
logger.error("delete data transfer data error", e);
putMsg(result, Status.DELETE_RESOURCE_ERROR);
return result;
}
List<String> successList = new ArrayList<>();
List<String> failList = new ArrayList<>();
for (StorageEntity storageEntity : storageEntities) {
File path = new File(storageEntity.getFullName());
String date = path.getName();
if (date.compareTo(deleteDate) <= 0) {
try {
storageOperate.delete(storageEntity.getFullName(), true);
successList.add(storageEntity.getFullName());
} catch (Exception ex) {
logger.error("delete data transfer data {} error, please delete it manually", date, ex);
failList.add(storageEntity.getFullName());
}
}
}
result.setSuccessList(successList);
result.setFailedList(failList);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* unauthorized file * unauthorized file
* *

48
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl; import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
@ -46,6 +47,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import java.io.IOException; import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -876,6 +878,52 @@ public class ResourcesServiceTest {
Assertions.assertTrue(CollectionUtils.isNotEmpty(resources)); Assertions.assertTrue(CollectionUtils.isNotEmpty(resources));
} }
@Test
public void testDeleteDataTransferData() throws Exception {
User user = getUser();
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(user.getTenantId())).thenReturn(getTenant());
StorageEntity storageEntity1 = Mockito.mock(StorageEntity.class);
StorageEntity storageEntity2 = Mockito.mock(StorageEntity.class);
StorageEntity storageEntity3 = Mockito.mock(StorageEntity.class);
StorageEntity storageEntity4 = Mockito.mock(StorageEntity.class);
StorageEntity storageEntity5 = Mockito.mock(StorageEntity.class);
Mockito.when(storageEntity1.getFullName()).thenReturn("DATA_TRANSFER/20220101");
Mockito.when(storageEntity2.getFullName()).thenReturn("DATA_TRANSFER/20220102");
Mockito.when(storageEntity3.getFullName()).thenReturn("DATA_TRANSFER/20220103");
Mockito.when(storageEntity4.getFullName()).thenReturn("DATA_TRANSFER/20220104");
Mockito.when(storageEntity5.getFullName()).thenReturn("DATA_TRANSFER/20220105");
List<StorageEntity> storageEntityList = new ArrayList<>();
storageEntityList.add(storageEntity1);
storageEntityList.add(storageEntity2);
storageEntityList.add(storageEntity3);
storageEntityList.add(storageEntity4);
storageEntityList.add(storageEntity5);
Mockito.when(storageOperate.listFilesStatus(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(storageEntityList);
LocalDateTime localDateTime = LocalDateTime.of(2022, 1, 5, 0, 0, 0);
try (MockedStatic<LocalDateTime> mockHook = Mockito.mockStatic(LocalDateTime.class)) {
mockHook.when(LocalDateTime::now).thenReturn(localDateTime);
DeleteDataTransferResponse response = resourcesService.deleteDataTransferData(user, 3);
Assertions.assertEquals(response.getSuccessList().size(), 2);
Assertions.assertEquals(response.getSuccessList().get(0), "DATA_TRANSFER/20220101");
Assertions.assertEquals(response.getSuccessList().get(1), "DATA_TRANSFER/20220102");
}
try (MockedStatic<LocalDateTime> mockHook = Mockito.mockStatic(LocalDateTime.class)) {
mockHook.when(LocalDateTime::now).thenReturn(localDateTime);
DeleteDataTransferResponse response = resourcesService.deleteDataTransferData(user, 0);
Assertions.assertEquals(response.getSuccessList().size(), 5);
}
}
@Test @Test
public void testCatFile() { public void testCatFile() {

8
dolphinscheduler-bom/pom.xml

@ -99,6 +99,7 @@
<datasync.version>2.17.282</datasync.version> <datasync.version>2.17.282</datasync.version>
<springdoc-openapi-ui.version>1.6.9</springdoc-openapi-ui.version> <springdoc-openapi-ui.version>1.6.9</springdoc-openapi-ui.version>
<aspectj.version>1.9.7</aspectj.version> <aspectj.version>1.9.7</aspectj.version>
<zt-zip.version>1.15</zt-zip.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@ -771,6 +772,13 @@
<artifactId>springdoc-openapi-ui</artifactId> <artifactId>springdoc-openapi-ui</artifactId>
<version>${springdoc-openapi-ui.version}</version> <version>${springdoc-openapi-ui.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
<version>${zt-zip.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</project> </project>

1
dolphinscheduler-dist/release-docs/LICENSE vendored

@ -498,6 +498,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
sdk-core 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/sdk-core/2.17.282, Apache 2.0 sdk-core 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/sdk-core/2.17.282, Apache 2.0
third-party-jackson-core 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/third-party-jackson-core/2.17.282, Apache 2.0 third-party-jackson-core 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/third-party-jackson-core/2.17.282, Apache 2.0
utils 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/utils/2.17.282, Apache 2.0 utils 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/utils/2.17.282, Apache 2.0
zt-zip 1.15: https://github.com/zeroturnaround/zt-zip/blob/master/LICENSE, Apache 2.0
======================================================================== ========================================================================
BSD licenses BSD licenses

202
dolphinscheduler-dist/release-docs/licenses/LICENSE-zt-zip.txt vendored

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2012 ZeroTurnaround LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DataType.java

@ -33,5 +33,5 @@ public enum DataType {
* 8 Boolean * 8 Boolean
* 9 list <String> * 9 list <String>
*/ */
VARCHAR, INTEGER, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, BOOLEAN, LIST VARCHAR, INTEGER, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, BOOLEAN, LIST, FILE
} }

4
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-custom-params.ts

@ -143,6 +143,10 @@ export const TYPE_LIST = [
{ {
value: 'LIST', value: 'LIST',
label: 'LIST' label: 'LIST'
},
{
value: 'FILE',
label: 'FILE'
} }
] ]

5
dolphinscheduler-worker/pom.xml

@ -101,6 +101,11 @@
<artifactId>spring-cloud-starter-kubernetes-fabric8-config</artifactId> <artifactId>spring-cloud-starter-kubernetes-fabric8-config</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils; import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.storage.StorageOperate; import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
@ -221,6 +222,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger); TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger);
logger.info("Resources:{} check success", taskExecutionContext.getResources()); logger.info("Resources:{} check success", taskExecutionContext.getResources());
TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate);
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) { if (null == taskChannel) {
throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", throw new TaskPluginException(String.format("%s task plugin not found, please check config file.",
@ -261,6 +264,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
taskExecutionContext.setProcessId(task.getProcessId()); taskExecutionContext.setProcessId(task.getProcessId());
taskExecutionContext.setAppIds(task.getAppIds()); taskExecutionContext.setAppIds(task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool())); taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
// upload out files and modify the "OUT FILE" property in VarPool
TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate);
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("Send task execute result to master, the current task status: {}", logger.info("Send task execute result to master, the current task status: {}",

270
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.utils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.IOException;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeroturnaround.zip.ZipUtil;
import com.fasterxml.jackson.databind.JsonNode;
public class TaskFilesTransferUtils {
protected final static Logger logger = LoggerFactory
.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
// tmp path in local path for transfer
final static String DOWNLOAD_TMP = ".DT_TMP";
// suffix of the package file
final static String PACK_SUFFIX = "_ds_pack.zip";
// root path in resource storage
final static String RESOURCE_TAG = "DATA_TRANSFER";
private TaskFilesTransferUtils() {
throw new IllegalStateException("Utility class");
}
/**
* upload output files to resource storage
*
* @param taskExecutionContext is the context of task
* @param storageOperate is the storage operate
* @throws TaskException TaskException
*/
public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext,
StorageOperate storageOperate) throws TaskException {
List<Property> varPools = getVarPools(taskExecutionContext);
// get map of varPools for quick search
Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
// get OUTPUT FILE parameters
List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
if (localParamsProperty.isEmpty()) {
return;
}
logger.info("Upload output files ...");
for (Property property : localParamsProperty) {
// get local file path
String srcPath =
packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
// get remote file path
String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
try {
// upload file to storage
String resourceWholePath =
storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
} catch (IOException ex) {
throw new TaskException("Upload file to storage error", ex);
}
// update varPool
Property oriProperty;
// if the property is not in varPool, add it
if (varPoolsMap.containsKey(property.getProp())) {
oriProperty = varPoolsMap.get(property.getProp());
} else {
oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
varPools.add(oriProperty);
}
oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
oriProperty.setValue(resourcePath);
}
taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
}
/**
* download upstream files from storage
* only download files which are defined in the task parameters
*
* @param taskExecutionContext is the context of task
* @param storageOperate is the storage operate
* @throws TaskException task exception
*/
public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
List<Property> varPools = getVarPools(taskExecutionContext);
// get map of varPools for quick search
Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
// get "IN FILE" parameters
List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
if (localParamsProperty.isEmpty()) {
return;
}
String executePath = taskExecutionContext.getExecutePath();
// data path to download packaged data
String downloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
logger.info("Download upstream files...");
for (Property property : localParamsProperty) {
Property inVarPool = varPoolsMap.get(property.getValue());
if (inVarPool == null) {
logger.error("{} not in {}", property.getValue(), varPoolsMap.keySet());
throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
property.getValue()));
}
String resourcePath = inVarPool.getValue();
String targetPath = String.format("%s/%s", executePath, property.getProp());
String downloadPath;
// If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
// targetPath
boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
if (isPack) {
downloadPath = String.format("%s/%s", downloadTmpPath, new File(resourcePath).getName());
} else {
downloadPath = targetPath;
}
try {
String resourceWholePath =
storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
true);
} catch (IOException ex) {
throw new TaskException("Download file from storage error", ex);
}
// unpack if the data is packaged
if (isPack) {
File downloadFile = new File(downloadPath);
logger.info("Unpack {} to {}", downloadPath, targetPath);
ZipUtil.unpack(downloadFile, new File(targetPath));
}
}
// delete DownloadTmp Folder if DownloadTmpPath exists
try {
org.apache.commons.io.FileUtils.deleteDirectory(new File(downloadTmpPath));
} catch (IOException e) {
logger.error("Delete DownloadTmpPath {} failed, this will not affect the task status", downloadTmpPath, e);
}
}
/**
* get local parameters property which type is FILE and direction is equal to direct
*
* @param taskExecutionContext is the context of task
* @param direct may be Direct.IN or Direct.OUT.
* @return List<Property>
*/
public static List<Property> getFileLocalParams(TaskExecutionContext taskExecutionContext, Direct direct) {
List<Property> localParamsProperty = new ArrayList<>();
JsonNode taskParams = JSONUtils.parseObject(taskExecutionContext.getTaskParams());
for (JsonNode localParam : taskParams.get("localParams")) {
Property property = JSONUtils.parseObject(localParam.toString(), Property.class);
if (property.getDirect().equals(direct) && property.getType().equals(DataType.FILE)) {
localParamsProperty.add(property);
}
}
return localParamsProperty;
}
/**
* get Resource path for manage files in storage
*
* @param taskExecutionContext is the context of task
* @param fileName is the file name
* @return resource path, RESOURCE_TAG/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName
*/
public static String getResourcePath(TaskExecutionContext taskExecutionContext, String fileName) {
String date =
DateUtils.formatTimeStamp(taskExecutionContext.getEndTime(), DateTimeFormatter.ofPattern("yyyyMMdd"));
// get resource Folder: RESOURCE_TAG/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID
String resourceFolder = String.format("%s/%s/%d/%d_%d", RESOURCE_TAG, date,
taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId());
// get resource fileL: resourceFolder/TaskName_TaskInstanceID_FileName
return String.format("%s/%s_%s_%s", resourceFolder, taskExecutionContext.getTaskName().replace(" ", "_"),
taskExecutionContext.getTaskInstanceId(), fileName);
}
/**
* get varPool from taskExecutionContext
*
* @param taskExecutionContext is the context of task
* @return List<Property>
*/
public static List<Property> getVarPools(TaskExecutionContext taskExecutionContext) {
List<Property> varPools = new ArrayList<>();
// get varPool
String varPoolString = taskExecutionContext.getVarPool();
if (StringUtils.isEmpty(varPoolString)) {
return varPools;
}
// parse varPool
for (JsonNode varPoolData : JSONUtils.parseArray(varPoolString)) {
Property property = JSONUtils.parseObject(varPoolData.toString(), Property.class);
varPools.add(property);
}
return varPools;
}
/**
* If the path is a directory, pack it and return the path of the package
*
* @param path is the input path, may be a file or a directory
* @return new path
*/
public static String packIfDir(String path) throws TaskException {
File file = new File(path);
if (!file.exists()) {
throw new TaskException(String.format("%s dose not exists", path));
}
String newPath;
if (file.isDirectory()) {
newPath = file.getPath() + PACK_SUFFIX;
logger.info("Pack {} to {}", path, newPath);
ZipUtil.pack(file, new File(newPath));
} else {
newPath = path;
}
return newPath;
}
}

255
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.utils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.curator.shaded.com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.time.format.DateTimeFormatter;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.zeroturnaround.zip.ZipUtil;
public class TaskFilesTransferUtilsTest {
private final long processDefineCode = 123;
private final int processDefineVersion = 456;
private final int processInstanceId = 678;
private final int taskInstanceId = 789;
private final String taskName = "test";
private final String tenantCode = "ubuntu";
private long endTime;
private String exceptTemplate;
@BeforeEach
void init() {
endTime = System.currentTimeMillis();
String date = DateUtils.formatTimeStamp(endTime, DateTimeFormatter.ofPattern("yyyyMMdd"));
exceptTemplate = String.format("%s/%s/%d/%d_%d/%s_%d",
TaskFilesTransferUtils.RESOURCE_TAG,
date,
processDefineCode,
processDefineVersion,
processInstanceId,
taskName,
taskInstanceId);
}
@Test
void testUploadOutputFiles() throws IOException {
File executePath = Files.createTempDir();
File folderPath = new File(executePath, "data");
File file = new File(folderPath.getPath() + "/test.txt");
if (!(folderPath.mkdirs() && file.createNewFile())) {
return;
}
String varPool = "[" +
String.format("{\"prop\":\"folder\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"%s\"},",
folderPath.getName())
+
String.format(" {\"prop\":\"file\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"%s/%s\"},",
folderPath.getName(), file.getName())
+
"{\"prop\":\"a\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"a\"}," +
"{\"prop\":\"b\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"b\"}" +
"]";
String taskParams = String.format("{\"localParams\": %s}", varPool);
TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder()
.varPool(varPool)
.taskParams(taskParams)
.processInstanceId(processInstanceId)
.processDefineVersion(processDefineVersion)
.processDefineCode(processDefineCode)
.taskInstanceId(taskInstanceId)
.taskName(taskName)
.tenantCode(tenantCode)
.executePath(executePath.toString())
.endTime(endTime)
.build();
List<Property> oriProperties = TaskFilesTransferUtils.getVarPools(taskExecutionContext);
StorageOperate storageOperate = Mockito.mock(StorageOperate.class);
TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate);
System.out.println(taskExecutionContext.getVarPool());
String exceptFolder =
String.format("%s_%s", exceptTemplate, folderPath.getName() + TaskFilesTransferUtils.PACK_SUFFIX);
String exceptFile = String.format("%s_%s", exceptTemplate, file.getName());
List<Property> properties = TaskFilesTransferUtils.getVarPools(taskExecutionContext);
Assertions.assertEquals(4, properties.size());
Assertions.assertEquals(String.format("%s.%s", taskName, "folder"), properties.get(0).getProp());
Assertions.assertEquals(exceptFolder, properties.get(0).getValue());
Assertions.assertEquals(String.format("%s.%s", taskName, "file"), properties.get(1).getProp());
Assertions.assertEquals(exceptFile, properties.get(1).getValue());
Assertions.assertEquals(oriProperties.get(2).getProp(), properties.get(2).getProp());
Assertions.assertEquals(oriProperties.get(3).getValue(), properties.get(3).getValue());
}
@Test
void testDownloadUpstreamFiles() {
File executePath = Files.createTempDir();
String folderPath = exceptTemplate + "_folder" + TaskFilesTransferUtils.PACK_SUFFIX;
String filePath = exceptTemplate + "_file";
String varPool = "[" +
String.format(
"{\"prop\":\"task1.folder\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"%s\"},", folderPath)
+
String.format(" {\"prop\":\"task2.file\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"%s\"},",
filePath)
+
"{\"prop\":\"a\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"a\"}," +
"{\"prop\":\"b\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"b\"}" +
"]";
String varPoolParams = "[" +
"{\"prop\":\"folder\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task1.folder\"}," +
" {\"prop\":\"file\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task2.file\"}" +
"]";
String taskParams = String.format("{\"localParams\": %s}", varPoolParams);
TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder()
.varPool(varPool)
.taskParams(taskParams)
.processInstanceId(processInstanceId)
.processDefineVersion(processDefineVersion)
.processDefineCode(processDefineCode)
.taskInstanceId(taskInstanceId)
.taskName(taskName)
.tenantCode(tenantCode)
.executePath(executePath.toString())
.endTime(endTime)
.build();
StorageOperate storageOperate = Mockito.mock(StorageOperate.class);
Mockito.mockStatic(ZipUtil.class);
Assertions.assertDoesNotThrow(
() -> TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate));
}
@Test
void testGetFileLocalParams() {
String taskParams = "{\"localParams\":[" +
"{\"prop\":\"inputFile\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task1.data\"}," +
"{\"prop\":\"outputFile\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"data\"}," +
"{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"a\"}," +
"{\"prop\":\"b\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"b\"}" +
"]}";
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(taskParams);
List<Property> fileLocalParamsIn = TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.IN);
Assertions.assertEquals(1, fileLocalParamsIn.size());
Assertions.assertEquals("inputFile", fileLocalParamsIn.get(0).getProp());
Assertions.assertEquals("task1.data", fileLocalParamsIn.get(0).getValue());
List<Property> fileLocalParamsOut = TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT);
Assertions.assertEquals(1, fileLocalParamsOut.size());
Assertions.assertEquals("outputFile", fileLocalParamsOut.get(0).getProp());
Assertions.assertEquals("data", fileLocalParamsOut.get(0).getValue());
}
@Test
void testGetResourcePath() {
String fileName = "test.txt";
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getEndTime()).thenReturn(endTime);
Mockito.when(taskExecutionContext.getProcessDefineCode()).thenReturn(processDefineCode);
Mockito.when(taskExecutionContext.getProcessDefineVersion()).thenReturn(processDefineVersion);
Mockito.when(taskExecutionContext.getProcessInstanceId()).thenReturn(processInstanceId);
Mockito.when(taskExecutionContext.getTaskInstanceId()).thenReturn(taskInstanceId);
Mockito.when(taskExecutionContext.getTaskName()).thenReturn(taskName);
String except = String.format("%s_%s", exceptTemplate, fileName);
Assertions.assertEquals(except, TaskFilesTransferUtils.getResourcePath(taskExecutionContext, fileName));
}
@Test
void testGetVarPools() {
String varPoolsString = "[" +
"{\"prop\":\"input\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task1.output\"}" +
",{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${a}\"}" +
"]";
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getVarPool()).thenReturn(varPoolsString);
List<Property> varPools = TaskFilesTransferUtils.getVarPools(taskExecutionContext);
Assertions.assertEquals(2, varPools.size());
Property varPool0 = varPools.get(0);
Assertions.assertEquals("input", varPool0.getProp());
Assertions.assertEquals(Direct.IN, varPool0.getDirect());
Assertions.assertEquals(DataType.FILE, varPool0.getType());
Assertions.assertEquals("task1.output", varPool0.getValue());
Property varPool1 = varPools.get(1);
Assertions.assertEquals("a", varPool1.getProp());
Assertions.assertEquals(Direct.IN, varPool1.getDirect());
Assertions.assertEquals(DataType.VARCHAR, varPool1.getType());
Assertions.assertEquals("${a}", varPool1.getValue());
Mockito.when(taskExecutionContext.getVarPool()).thenReturn("[]");
List<Property> varPoolsEmpty = TaskFilesTransferUtils.getVarPools(taskExecutionContext);
Assertions.assertEquals(0, varPoolsEmpty.size());
Mockito.when(taskExecutionContext.getVarPool()).thenReturn(null);
List<Property> varPoolsNull = TaskFilesTransferUtils.getVarPools(taskExecutionContext);
Assertions.assertEquals(0, varPoolsNull.size());
}
@Test
void testPackIfDir() throws Exception {
File folderPath = Files.createTempDir();
File file1 = new File(folderPath.getPath() + "/test.txt");
File file2 = new File(folderPath.getPath() + "/test.zip");
boolean isSuccess1 = file1.createNewFile();
boolean isSuccess2 = file2.createNewFile();
Assertions.assertTrue(isSuccess1);
Assertions.assertTrue(isSuccess2);
Assertions.assertEquals(file1.getPath(), TaskFilesTransferUtils.packIfDir(file1.getPath()));
Assertions.assertEquals(file2.getPath(), TaskFilesTransferUtils.packIfDir(file2.getPath()));
String expectFolderPackPath = folderPath.getPath() + TaskFilesTransferUtils.PACK_SUFFIX;
Assertions.assertEquals(expectFolderPackPath, TaskFilesTransferUtils.packIfDir(folderPath.getPath()));
}
}

1
tools/dependencies/known-dependencies.txt

@ -338,6 +338,7 @@ woodstox-core-5.3.0.jar
xmlbeans-3.1.0.jar xmlbeans-3.1.0.jar
zeppelin-client-0.10.1.jar zeppelin-client-0.10.1.jar
zeppelin-common-0.10.1.jar zeppelin-common-0.10.1.jar
zt-zip-1.15.jar
grpc-api-1.41.0.jar grpc-api-1.41.0.jar
grpc-context-1.41.0.jar grpc-context-1.41.0.jar
grpc-core-1.41.0.jar grpc-core-1.41.0.jar

Loading…
Cancel
Save