diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js index 60b038b3e7..5e44389f87 100644 --- a/docs/configs/docsdev.js +++ b/docs/configs/docsdev.js @@ -238,6 +238,10 @@ export default { title: 'Parameter Priority', link: '/en-us/docs/dev/user_doc/guide/parameter/priority.html', }, + { + title: 'File Parameter', + link: '/en-us/docs/dev/user_doc/guide/parameter/file-parameter.html', + }, ], }, { @@ -890,6 +894,10 @@ export default { title: '参数优先级', link: '/zh-cn/docs/dev/user_doc/guide/parameter/priority.html', }, + { + title: '文件参数传递', + link: '/zh-cn/docs/dev/user_doc/guide/parameter/file-parameter.html', + }, ], }, { diff --git a/docs/docs/en/guide/parameter/file-parameter.md b/docs/docs/en/guide/parameter/file-parameter.md new file mode 100644 index 0000000000..4ba9b30424 --- /dev/null +++ b/docs/docs/en/guide/parameter/file-parameter.md @@ -0,0 +1,102 @@ +# FILE Parameter + +Use the file parameter to pass files (or folders, hereinafter referred to as **files**) in the working directory of the upstream task to the downstream task in the same workflow instance. The following scenarios may be used + +- In the ETL task, pass the data files processed by multiple upstream tasks to a specific downstream task. +- In the machine learning scenario, pass the data set file of the upstream data preparation task to the downstream model training task. + +## Usage + +### Configure file parameter + +File parameter configuration method: click the plus sign on the right side of "Custom Parameters" on the task definition page to configure. + +### Output file to downstream task + +**Four options of custom parameters are:** + +- Parameter name: the identifier used when passing tasks, such as `KEY1` and `KEY2` in the figure below +- Direction: OUT, which means outputting the file to the downstream task +- Parameter type: FILE, indicating file parameter +- Parameter value: output file path, such as `data` and `data/test2/text.txt` in the figure below + +The configuration in the figure below indicates that the `output` task passes two file data to the downstream task, respectively: + +- Pass out the folder `data`, and mark it as `dir-data`. The downstream task can get this folder through `output.dir-data` +- Pass out the file `data/test2/text.txt`, and mark it as `file-text`. The downstream task can get this folder through `output.file-text` + +![img.png](../../../../img/new_ui/dev/parameter/file_parameter_output.png) + +### Get the file from the upstream task + +**Four options of custom parameters are:** + +- Parameter name: the position where the upstream file is saved after input, such as `input_dir` used in the figure below +- Direction: IN, which means to get the file from the upstream task +- Parameter type: FILE, indicating file parameter +- Parameter value: the identifier of the upstream file, in the format of `taskName.KEY`. For example, `output.dir-data` in the figure below, where `output` is the name of the upstream task, and `dir-data` is the file identifier output by the upstream task + +The configuration in the figure below indicates that the task gets the folder identified by `dir-data` from the upstream task `output` and saves it as `input_dir` + +![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_dir.png) + +The configuration in the figure below indicates that the task gets the file identified by `file-text` from the upstream task `output` and saves it as `input.txt` + +![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_file.png) + +## Other + +### Note + +- The file transfer between upstream and downstream tasks is based on the resource center as a transfer, and the data is saved in the `DATA_TRANSFER` directory of the resource center. Therefore, **the resource center function must be enabled**, please refer to [Resource Center Configuration Details](../resource/configuration.md) for details, otherwise the file parameter function cannot be used. +- The file naming rule is `DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName` +- If the transferred file data is a folder, it will be packaged into a compressed file with a suffix of `.zip` and uploaded. The downstream task will unzip and save it in the corresponding directory after receiving it +- If you need to delete the file data, you can delete the corresponding folder in the `DATA_TRANSFER` directory of the resource center. If you delete the date subdirectory directly, all the file data under that date will be deleted. You can also use the [Open API interface](../open-api.md) (`resources/data-transfer`) to delete the corresponding file data (delete data N days ago). +- If there is a task chain task1->task2->tas3, then the downstream task task3 can also get the file data of task1 +- Support one-to-many transmission and many-to-one transmission +- If you frequently transfer a large number of files, it is obvious that the system IO performance will be affected by the amount of transferred data + +### Example + +You can save the following YAML file locally and then execute `pydolphinscheduler yaml -f data-transfer.yaml` to run the Demo. + +```yaml +# Define the workflow +workflow: + name: "data-transfer" + run: true + +# Define the tasks under the workflow +tasks: + - name: output + task_type: Shell + command: | + mkdir -p data/test1 data/test2 + echo "test1 message" >> data/test1/text.txt + echo "test2 message" >> data/test2/text.txt + tree . + local_params: + - { "prop": "dir-data", "direct": "OUT", "type": "FILE", "value": "data" } + - { "prop": "file-text", "direct": "OUT", "type": "FILE", "value": "data/test2/text.txt" } + + - name: input_dir + task_type: Shell + deps: [output] + command: | + tree . + cat input_dir/test1/text.txt + cat input_dir/test2/text.txt + local_params: + - { "prop": "input_dir", "direct": "IN", "type": "FILE", "value": "output.dir-data" } + + + - name: input_file + task_type: Shell + deps: [output] + command: | + tree . + cat input.txt + local_params: + - { "prop": "input.txt", "direct": "IN", "type": "FILE", "value": "output.file-text" } +``` + diff --git a/docs/docs/zh/guide/parameter/file-parameter.md b/docs/docs/zh/guide/parameter/file-parameter.md new file mode 100644 index 0000000000..6ddfcbf44e --- /dev/null +++ b/docs/docs/zh/guide/parameter/file-parameter.md @@ -0,0 +1,101 @@ +# 文件参数 + +通过配置文件参数,在同一工作流实例中,可以将上游任务工作目录下的文件(或文件夹,下统一以**文件**代替)传递给下游任务。 如以下场景可能使用到 + +- 在ETL任务中,将多个上游任务处理好的数据文件一起传递给特定的下游任务。 +- 在机器学习场景中,将上游数据准备任务的数据集文件传递给下游模型训练任务。 + +## 使用方式 + +### 配置文件参数 + +文件参数配置方式如下:在任务定义页面,点击“自定义参数”右边的加号,即可进行配置。 + +### 输出文件给下游任务 + +**自定义参数四个选项分别为:** + +- 参数名:任务间传递时使用的标识,如下图中使用的`KEY1`和`KEY2` +- 方向:OUT, 则表示输出文件给下游任务 +- 参数类型:FILE, 表示文件参数 +- 参数值:输出的文件路径,如下图中的`data`和`data/test2/text.txt` + +下图的配置表示任务`output`向下游任务传递两个文件数据,分别为: +- 传出文件夹 `data`, 并标记为`dir-data`, 下游任务可以通过`output.dir-data`获取该文件夹 +- 传出文件 `data/test2/text.txt`, 并标记为`file-text`, 下游任务可以通过`output.file-text`获取该文件夹 + +![img.png](../../../../img/new_ui/dev/parameter/file_parameter_output.png) + +### 获取上游任务的文件 + +**自定义参数四个选项分别为:** + +- 参数名:上游文件输入后保存的位置,如下图中使用的`input_dir` +- 方向:IN, 则表示从上游任务获取文件 +- 参数类型:FILE, 表示文件参数 +- 参数值:上游文件的标识,为 `taskName.KEY` 的格式 如下图中的`output.dir-data`, 其中`output`为上游任务的名称,`dir-data`为上游任务中输出的文件标识 + +下图的配置表示任务从上游任务`output`中获取标识为`dir-data`的文件夹,并保存为`input_dir` + +![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_dir.png) + +下图的配置表示任务从上游任务`output`中获取标识为`file-text`的文件,并保存为`input.txt` + +![img.png](../../../../img/new_ui/dev/parameter/file_parameter_input_file.png) + +## 其他 + +### 备注 + +- 上下游任务间的文件传递基于资源中心作为中转,数据保存在资源中心`DATA_TRANSFER`的目录下, 因此**必须开启资源中心功能**,详情请参考[资源中心配置详情](../resource/configuration.md), 否则无法使用文件参数功能。 +- 文件命名规则为 `DATA_TRANSFER/日期/工作流Code/工作流版本_工作流实例ID/任务名称_任务实例ID_文件名` +- 若传输的文件数据为文件夹,则会打包成后缀为`.zip`的压缩文件再上传,下游任务接到后会解压并保存在对应目录 +- 若需要删除文件数据,可以在资源中心的`DATA_TRANSFER`目录下删除对应文件夹即可, 如直接按照日期子目录删除,会删除该日期下所有的文件数据. 也可以使用`resources/data-transfer`[Open API 接口](../open-api.md)(删除N天前的数据)删除对应文件数据。 +- 如果存在任务链 task1->task2->tas3, 则最下游任务task3也能获取task1的文件数据 +- 支持一对多传输以及多对一传输 +- 如果频繁大量传输文件,毫无疑问会因传输的数据量影响到系统IO性能 + +### 样例 + +你可以保存以下YAML文件到本地,然后执行`pydolphinscheduler yaml -f data-transfer.yaml`即可运行Demo. + +```yaml +# Define the workflow +workflow: + name: "data-transfer" + run: true + +# Define the tasks under the workflow +tasks: + - name: output + task_type: Shell + command: | + mkdir -p data/test1 data/test2 + echo "test1 message" >> data/test1/text.txt + echo "test2 message" >> data/test2/text.txt + tree . + local_params: + - { "prop": "dir-data", "direct": "OUT", "type": "FILE", "value": "data" } + - { "prop": "file-text", "direct": "OUT", "type": "FILE", "value": "data/test2/text.txt" } + + - name: input_dir + task_type: Shell + deps: [output] + command: | + tree . + cat input_dir/test1/text.txt + cat input_dir/test2/text.txt + local_params: + - { "prop": "input_dir", "direct": "IN", "type": "FILE", "value": "output.dir-data" } + + + - name: input_file + task_type: Shell + deps: [output] + command: | + tree . + cat input.txt + local_params: + - { "prop": "input.txt", "direct": "IN", "type": "FILE", "value": "output.file-text" } +``` + diff --git a/docs/img/new_ui/dev/parameter/file_parameter_input_dir.png b/docs/img/new_ui/dev/parameter/file_parameter_input_dir.png new file mode 100644 index 0000000000..e510782585 Binary files /dev/null and b/docs/img/new_ui/dev/parameter/file_parameter_input_dir.png differ diff --git a/docs/img/new_ui/dev/parameter/file_parameter_input_file.png b/docs/img/new_ui/dev/parameter/file_parameter_input_file.png new file mode 100644 index 0000000000..3d2f13133c Binary files /dev/null and b/docs/img/new_ui/dev/parameter/file_parameter_input_file.png differ diff --git a/docs/img/new_ui/dev/parameter/file_parameter_output.png b/docs/img/new_ui/dev/parameter/file_parameter_output.png new file mode 100644 index 0000000000..48bece4f67 Binary files /dev/null and b/docs/img/new_ui/dev/parameter/file_parameter_output.png differ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java index ff65bfd94b..c1d0fbae5c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java @@ -42,6 +42,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.VIEW_RESOURCE_FILE_ON import static org.apache.dolphinscheduler.api.enums.Status.VIEW_UDF_FUNCTION_ERROR; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; +import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.ResourcesService; import org.apache.dolphinscheduler.api.service.UdfFuncService; @@ -270,6 +271,25 @@ public class ResourcesController extends BaseController { return resourceService.delete(loginUser, fullName, tenantCode); } + /** + * delete DATA_TRANSFER data + * + * @param loginUser login user + * @return delete result code + */ + @Operation(summary = "deleteDataTransferData", description = "Delete the N days ago data of DATA_TRANSFER ") + @Parameters({ + @Parameter(name = "days", description = "N days ago", required = true, schema = @Schema(implementation = Integer.class)) + }) + @DeleteMapping(value = "/data-transfer") + @ResponseStatus(HttpStatus.OK) + @ApiException(DELETE_RESOURCE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public DeleteDataTransferResponse deleteDataTransferData(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "days") Integer days) { + return resourceService.deleteDataTransferData(loginUser, days); + } + /** * verify resource by alias and type * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/DeleteDataTransferResponse.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/DeleteDataTransferResponse.java new file mode 100644 index 0000000000..ab89ca510d --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/DeleteDataTransferResponse.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.resources; + +import org.apache.dolphinscheduler.api.utils.Result; + +import java.util.List; + +import lombok.Data; + +@Data +public class DeleteDataTransferResponse extends Result { + + private List successList; + + private List failedList; + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index c0c7a7d211..753e293a73 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.dao.entity.Resource; @@ -230,6 +231,14 @@ public interface ResourcesService { */ Resource queryResourcesFileInfo(String userName, String fullName); + /** + * delete DATA_TRANSFER data in resource center + * + * @param loginUser user who query resource + * @param days number of days + */ + DeleteDataTransferResponse deleteDataTransferData(User loginUser, Integer days); + /** * unauthorized file * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index d82a0e4b7d..91aff2347f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -26,6 +26,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S; import static org.apache.dolphinscheduler.common.constants.Constants.JAR; import static org.apache.dolphinscheduler.common.constants.Constants.PERIOD; +import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse; import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter; import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor; @@ -71,9 +72,12 @@ import org.apache.commons.beanutils.BeanMap; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import java.io.File; import java.io.IOException; import java.rmi.ServerException; import java.text.MessageFormat; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1871,6 +1875,64 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe return (Resource) resourceResponse.getData(); } + @Override + public DeleteDataTransferResponse deleteDataTransferData(User loginUser, Integer days) { + DeleteDataTransferResponse result = new DeleteDataTransferResponse(); + + User user = userMapper.selectById(loginUser.getId()); + if (user == null) { + logger.error("user {} not exists", loginUser.getId()); + putMsg(result, Status.USER_NOT_EXIST, loginUser.getId()); + return result; + } + + Tenant tenant = tenantMapper.queryById(user.getTenantId()); + if (tenant == null) { + logger.error("tenant not exists"); + putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST); + return result; + } + + String tenantCode = tenant.getTenantCode(); + + String baseFolder = storageOperate.getResourceFileName(tenantCode, "DATA_TRANSFER"); + + LocalDateTime now = LocalDateTime.now(); + now = now.minus(days, ChronoUnit.DAYS); + String deleteDate = now.toLocalDate().toString().replace("-", ""); + List storageEntities; + try { + storageEntities = new ArrayList<>( + storageOperate.listFilesStatus(baseFolder, baseFolder, tenantCode, ResourceType.FILE)); + } catch (Exception e) { + logger.error("delete data transfer data error", e); + putMsg(result, Status.DELETE_RESOURCE_ERROR); + return result; + } + + List successList = new ArrayList<>(); + List failList = new ArrayList<>(); + + for (StorageEntity storageEntity : storageEntities) { + File path = new File(storageEntity.getFullName()); + String date = path.getName(); + if (date.compareTo(deleteDate) <= 0) { + try { + storageOperate.delete(storageEntity.getFullName(), true); + successList.add(storageEntity.getFullName()); + } catch (Exception ex) { + logger.error("delete data transfer data {} error, please delete it manually", date, ex); + failList.add(storageEntity.getFullName()); + } + } + } + + result.setSuccessList(successList); + result.setFailedList(failList); + putMsg(result, Status.SUCCESS); + return result; + } + /** * unauthorized file * diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index ecfbc72548..21f0a08d21 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service; import static org.mockito.ArgumentMatchers.eq; +import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl; @@ -46,6 +47,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.commons.collections.CollectionUtils; import java.io.IOException; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -876,6 +878,52 @@ public class ResourcesServiceTest { Assertions.assertTrue(CollectionUtils.isNotEmpty(resources)); } + @Test + public void testDeleteDataTransferData() throws Exception { + User user = getUser(); + Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser()); + Mockito.when(tenantMapper.queryById(user.getTenantId())).thenReturn(getTenant()); + + StorageEntity storageEntity1 = Mockito.mock(StorageEntity.class); + StorageEntity storageEntity2 = Mockito.mock(StorageEntity.class); + StorageEntity storageEntity3 = Mockito.mock(StorageEntity.class); + StorageEntity storageEntity4 = Mockito.mock(StorageEntity.class); + StorageEntity storageEntity5 = Mockito.mock(StorageEntity.class); + + Mockito.when(storageEntity1.getFullName()).thenReturn("DATA_TRANSFER/20220101"); + Mockito.when(storageEntity2.getFullName()).thenReturn("DATA_TRANSFER/20220102"); + Mockito.when(storageEntity3.getFullName()).thenReturn("DATA_TRANSFER/20220103"); + Mockito.when(storageEntity4.getFullName()).thenReturn("DATA_TRANSFER/20220104"); + Mockito.when(storageEntity5.getFullName()).thenReturn("DATA_TRANSFER/20220105"); + + List storageEntityList = new ArrayList<>(); + storageEntityList.add(storageEntity1); + storageEntityList.add(storageEntity2); + storageEntityList.add(storageEntity3); + storageEntityList.add(storageEntity4); + storageEntityList.add(storageEntity5); + + Mockito.when(storageOperate.listFilesStatus(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(storageEntityList); + + LocalDateTime localDateTime = LocalDateTime.of(2022, 1, 5, 0, 0, 0); + try (MockedStatic mockHook = Mockito.mockStatic(LocalDateTime.class)) { + mockHook.when(LocalDateTime::now).thenReturn(localDateTime); + DeleteDataTransferResponse response = resourcesService.deleteDataTransferData(user, 3); + + Assertions.assertEquals(response.getSuccessList().size(), 2); + Assertions.assertEquals(response.getSuccessList().get(0), "DATA_TRANSFER/20220101"); + Assertions.assertEquals(response.getSuccessList().get(1), "DATA_TRANSFER/20220102"); + } + + try (MockedStatic mockHook = Mockito.mockStatic(LocalDateTime.class)) { + mockHook.when(LocalDateTime::now).thenReturn(localDateTime); + DeleteDataTransferResponse response = resourcesService.deleteDataTransferData(user, 0); + Assertions.assertEquals(response.getSuccessList().size(), 5); + } + + } + @Test public void testCatFile() { diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml index 668a9b0ada..f33032e880 100644 --- a/dolphinscheduler-bom/pom.xml +++ b/dolphinscheduler-bom/pom.xml @@ -99,6 +99,7 @@ 2.17.282 1.6.9 1.9.7 + 1.15 @@ -771,6 +772,13 @@ springdoc-openapi-ui ${springdoc-openapi-ui.version} + + + org.zeroturnaround + zt-zip + ${zt-zip.version} + + diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index 84502593dc..fc1ae6aa65 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -498,6 +498,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. sdk-core 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/sdk-core/2.17.282, Apache 2.0 third-party-jackson-core 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/third-party-jackson-core/2.17.282, Apache 2.0 utils 2.17.282: https://mvnrepository.com/artifact/software.amazon.awssdk/utils/2.17.282, Apache 2.0 + zt-zip 1.15: https://github.com/zeroturnaround/zt-zip/blob/master/LICENSE, Apache 2.0 ======================================================================== BSD licenses diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-zt-zip.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-zt-zip.txt new file mode 100644 index 0000000000..c8491cd91a --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-zt-zip.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2012 ZeroTurnaround LLC. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DataType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DataType.java index 16e0a0aa65..34c26dae5a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DataType.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DataType.java @@ -33,5 +33,5 @@ public enum DataType { * 8 Boolean * 9 list */ - VARCHAR, INTEGER, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, BOOLEAN, LIST + VARCHAR, INTEGER, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, BOOLEAN, LIST, FILE } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-custom-params.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-custom-params.ts index fb9d098d45..2052f524fe 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-custom-params.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-custom-params.ts @@ -143,6 +143,10 @@ export const TYPE_LIST = [ { value: 'LIST', label: 'LIST' + }, + { + value: 'FILE', + label: 'FILE' } ] diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml index 50a07e07ac..5386d78263 100644 --- a/dolphinscheduler-worker/pom.xml +++ b/dolphinscheduler-worker/pom.xml @@ -101,6 +101,11 @@ spring-cloud-starter-kubernetes-fabric8-config + + org.zeroturnaround + zt-zip + + diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java index fdbd2fd448..e4ef748eee 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java @@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils; +import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.storage.StorageOperate; import org.apache.dolphinscheduler.service.task.TaskPluginManager; @@ -221,6 +222,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger); logger.info("Resources:{} check success", taskExecutionContext.getResources()); + TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate); + TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); if (null == taskChannel) { throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", @@ -261,6 +264,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { taskExecutionContext.setProcessId(task.getProcessId()); taskExecutionContext.setAppIds(task.getAppIds()); taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool())); + // upload out files and modify the "OUT FILE" property in VarPool + TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); logger.info("Send task execute result to master, the current task status: {}", diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java new file mode 100644 index 0000000000..9e3238a0d1 --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.utils; + +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.service.storage.StorageOperate; + +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeroturnaround.zip.ZipUtil; + +import com.fasterxml.jackson.databind.JsonNode; + +public class TaskFilesTransferUtils { + + protected final static Logger logger = LoggerFactory + .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class)); + + // tmp path in local path for transfer + final static String DOWNLOAD_TMP = ".DT_TMP"; + + // suffix of the package file + final static String PACK_SUFFIX = "_ds_pack.zip"; + + // root path in resource storage + final static String RESOURCE_TAG = "DATA_TRANSFER"; + + private TaskFilesTransferUtils() { + throw new IllegalStateException("Utility class"); + } + + /** + * upload output files to resource storage + * + * @param taskExecutionContext is the context of task + * @param storageOperate is the storage operate + * @throws TaskException TaskException + */ + public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext, + StorageOperate storageOperate) throws TaskException { + List varPools = getVarPools(taskExecutionContext); + // get map of varPools for quick search + Map varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x)); + + // get OUTPUT FILE parameters + List localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT); + + if (localParamsProperty.isEmpty()) { + return; + } + + logger.info("Upload output files ..."); + for (Property property : localParamsProperty) { + // get local file path + String srcPath = + packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue())); + // get remote file path + String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName()); + try { + // upload file to storage + String resourceWholePath = + storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath); + logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath); + storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true); + } catch (IOException ex) { + throw new TaskException("Upload file to storage error", ex); + } + + // update varPool + Property oriProperty; + // if the property is not in varPool, add it + if (varPoolsMap.containsKey(property.getProp())) { + oriProperty = varPoolsMap.get(property.getProp()); + } else { + oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue()); + varPools.add(oriProperty); + } + oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp())); + oriProperty.setValue(resourcePath); + } + taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools)); + } + + /** + * download upstream files from storage + * only download files which are defined in the task parameters + * + * @param taskExecutionContext is the context of task + * @param storageOperate is the storage operate + * @throws TaskException task exception + */ + public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) { + List varPools = getVarPools(taskExecutionContext); + // get map of varPools for quick search + Map varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x)); + + // get "IN FILE" parameters + List localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN); + + if (localParamsProperty.isEmpty()) { + return; + } + + String executePath = taskExecutionContext.getExecutePath(); + // data path to download packaged data + String downloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP); + + logger.info("Download upstream files..."); + for (Property property : localParamsProperty) { + Property inVarPool = varPoolsMap.get(property.getValue()); + if (inVarPool == null) { + logger.error("{} not in {}", property.getValue(), varPoolsMap.keySet()); + throw new TaskException(String.format("Can not find upstream file using %s, please check the key", + property.getValue())); + } + + String resourcePath = inVarPool.getValue(); + String targetPath = String.format("%s/%s", executePath, property.getProp()); + + String downloadPath; + // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the + // targetPath + boolean isPack = resourcePath.endsWith(PACK_SUFFIX); + if (isPack) { + downloadPath = String.format("%s/%s", downloadTmpPath, new File(resourcePath).getName()); + } else { + downloadPath = targetPath; + } + + try { + String resourceWholePath = + storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath); + logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath); + storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false, + true); + } catch (IOException ex) { + throw new TaskException("Download file from storage error", ex); + } + + // unpack if the data is packaged + if (isPack) { + File downloadFile = new File(downloadPath); + logger.info("Unpack {} to {}", downloadPath, targetPath); + ZipUtil.unpack(downloadFile, new File(targetPath)); + } + } + + // delete DownloadTmp Folder if DownloadTmpPath exists + try { + org.apache.commons.io.FileUtils.deleteDirectory(new File(downloadTmpPath)); + } catch (IOException e) { + logger.error("Delete DownloadTmpPath {} failed, this will not affect the task status", downloadTmpPath, e); + } + } + + /** + * get local parameters property which type is FILE and direction is equal to direct + * + * @param taskExecutionContext is the context of task + * @param direct may be Direct.IN or Direct.OUT. + * @return List + */ + public static List getFileLocalParams(TaskExecutionContext taskExecutionContext, Direct direct) { + List localParamsProperty = new ArrayList<>(); + JsonNode taskParams = JSONUtils.parseObject(taskExecutionContext.getTaskParams()); + for (JsonNode localParam : taskParams.get("localParams")) { + Property property = JSONUtils.parseObject(localParam.toString(), Property.class); + + if (property.getDirect().equals(direct) && property.getType().equals(DataType.FILE)) { + localParamsProperty.add(property); + } + } + return localParamsProperty; + } + + /** + * get Resource path for manage files in storage + * + * @param taskExecutionContext is the context of task + * @param fileName is the file name + * @return resource path, RESOURCE_TAG/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName + */ + public static String getResourcePath(TaskExecutionContext taskExecutionContext, String fileName) { + String date = + DateUtils.formatTimeStamp(taskExecutionContext.getEndTime(), DateTimeFormatter.ofPattern("yyyyMMdd")); + // get resource Folder: RESOURCE_TAG/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID + String resourceFolder = String.format("%s/%s/%d/%d_%d", RESOURCE_TAG, date, + taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineVersion(), + taskExecutionContext.getProcessInstanceId()); + // get resource fileL: resourceFolder/TaskName_TaskInstanceID_FileName + return String.format("%s/%s_%s_%s", resourceFolder, taskExecutionContext.getTaskName().replace(" ", "_"), + taskExecutionContext.getTaskInstanceId(), fileName); + } + + /** + * get varPool from taskExecutionContext + * + * @param taskExecutionContext is the context of task + * @return List + */ + public static List getVarPools(TaskExecutionContext taskExecutionContext) { + List varPools = new ArrayList<>(); + + // get varPool + String varPoolString = taskExecutionContext.getVarPool(); + if (StringUtils.isEmpty(varPoolString)) { + return varPools; + } + // parse varPool + for (JsonNode varPoolData : JSONUtils.parseArray(varPoolString)) { + Property property = JSONUtils.parseObject(varPoolData.toString(), Property.class); + varPools.add(property); + } + return varPools; + } + + /** + * If the path is a directory, pack it and return the path of the package + * + * @param path is the input path, may be a file or a directory + * @return new path + */ + public static String packIfDir(String path) throws TaskException { + File file = new File(path); + if (!file.exists()) { + throw new TaskException(String.format("%s dose not exists", path)); + } + String newPath; + if (file.isDirectory()) { + newPath = file.getPath() + PACK_SUFFIX; + logger.info("Pack {} to {}", path, newPath); + ZipUtil.pack(file, new File(newPath)); + } else { + newPath = path; + } + return newPath; + } +} diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java new file mode 100644 index 0000000000..94a6a36ae1 --- /dev/null +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.utils; + +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.service.storage.StorageOperate; + +import org.apache.curator.shaded.com.google.common.io.Files; + +import java.io.File; +import java.io.IOException; +import java.time.format.DateTimeFormatter; +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.zeroturnaround.zip.ZipUtil; + +public class TaskFilesTransferUtilsTest { + + private final long processDefineCode = 123; + private final int processDefineVersion = 456; + private final int processInstanceId = 678; + private final int taskInstanceId = 789; + private final String taskName = "test"; + + private final String tenantCode = "ubuntu"; + + private long endTime; + + private String exceptTemplate; + + @BeforeEach + void init() { + endTime = System.currentTimeMillis(); + String date = DateUtils.formatTimeStamp(endTime, DateTimeFormatter.ofPattern("yyyyMMdd")); + exceptTemplate = String.format("%s/%s/%d/%d_%d/%s_%d", + TaskFilesTransferUtils.RESOURCE_TAG, + date, + processDefineCode, + processDefineVersion, + processInstanceId, + taskName, + taskInstanceId); + } + + @Test + void testUploadOutputFiles() throws IOException { + File executePath = Files.createTempDir(); + File folderPath = new File(executePath, "data"); + File file = new File(folderPath.getPath() + "/test.txt"); + if (!(folderPath.mkdirs() && file.createNewFile())) { + return; + } + String varPool = "[" + + String.format("{\"prop\":\"folder\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"%s\"},", + folderPath.getName()) + + + String.format(" {\"prop\":\"file\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"%s/%s\"},", + folderPath.getName(), file.getName()) + + + "{\"prop\":\"a\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"a\"}," + + "{\"prop\":\"b\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"b\"}" + + "]"; + String taskParams = String.format("{\"localParams\": %s}", varPool); + TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder() + .varPool(varPool) + .taskParams(taskParams) + .processInstanceId(processInstanceId) + .processDefineVersion(processDefineVersion) + .processDefineCode(processDefineCode) + .taskInstanceId(taskInstanceId) + .taskName(taskName) + .tenantCode(tenantCode) + .executePath(executePath.toString()) + .endTime(endTime) + .build(); + + List oriProperties = TaskFilesTransferUtils.getVarPools(taskExecutionContext); + + StorageOperate storageOperate = Mockito.mock(StorageOperate.class); + TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate); + System.out.println(taskExecutionContext.getVarPool()); + + String exceptFolder = + String.format("%s_%s", exceptTemplate, folderPath.getName() + TaskFilesTransferUtils.PACK_SUFFIX); + String exceptFile = String.format("%s_%s", exceptTemplate, file.getName()); + + List properties = TaskFilesTransferUtils.getVarPools(taskExecutionContext); + Assertions.assertEquals(4, properties.size()); + + Assertions.assertEquals(String.format("%s.%s", taskName, "folder"), properties.get(0).getProp()); + Assertions.assertEquals(exceptFolder, properties.get(0).getValue()); + + Assertions.assertEquals(String.format("%s.%s", taskName, "file"), properties.get(1).getProp()); + Assertions.assertEquals(exceptFile, properties.get(1).getValue()); + + Assertions.assertEquals(oriProperties.get(2).getProp(), properties.get(2).getProp()); + Assertions.assertEquals(oriProperties.get(3).getValue(), properties.get(3).getValue()); + + } + + @Test + void testDownloadUpstreamFiles() { + File executePath = Files.createTempDir(); + String folderPath = exceptTemplate + "_folder" + TaskFilesTransferUtils.PACK_SUFFIX; + String filePath = exceptTemplate + "_file"; + String varPool = "[" + + String.format( + "{\"prop\":\"task1.folder\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"%s\"},", folderPath) + + + String.format(" {\"prop\":\"task2.file\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"%s\"},", + filePath) + + + "{\"prop\":\"a\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"a\"}," + + "{\"prop\":\"b\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"b\"}" + + "]"; + String varPoolParams = "[" + + "{\"prop\":\"folder\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task1.folder\"}," + + " {\"prop\":\"file\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task2.file\"}" + + "]"; + String taskParams = String.format("{\"localParams\": %s}", varPoolParams); + TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder() + .varPool(varPool) + .taskParams(taskParams) + .processInstanceId(processInstanceId) + .processDefineVersion(processDefineVersion) + .processDefineCode(processDefineCode) + .taskInstanceId(taskInstanceId) + .taskName(taskName) + .tenantCode(tenantCode) + .executePath(executePath.toString()) + .endTime(endTime) + .build(); + + StorageOperate storageOperate = Mockito.mock(StorageOperate.class); + Mockito.mockStatic(ZipUtil.class); + Assertions.assertDoesNotThrow( + () -> TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate)); + } + + @Test + void testGetFileLocalParams() { + String taskParams = "{\"localParams\":[" + + "{\"prop\":\"inputFile\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task1.data\"}," + + "{\"prop\":\"outputFile\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"data\"}," + + "{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"a\"}," + + "{\"prop\":\"b\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"b\"}" + + "]}"; + TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); + Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(taskParams); + + List fileLocalParamsIn = TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.IN); + Assertions.assertEquals(1, fileLocalParamsIn.size()); + Assertions.assertEquals("inputFile", fileLocalParamsIn.get(0).getProp()); + Assertions.assertEquals("task1.data", fileLocalParamsIn.get(0).getValue()); + + List fileLocalParamsOut = TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT); + Assertions.assertEquals(1, fileLocalParamsOut.size()); + Assertions.assertEquals("outputFile", fileLocalParamsOut.get(0).getProp()); + Assertions.assertEquals("data", fileLocalParamsOut.get(0).getValue()); + + } + + @Test + void testGetResourcePath() { + String fileName = "test.txt"; + TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); + + Mockito.when(taskExecutionContext.getEndTime()).thenReturn(endTime); + + Mockito.when(taskExecutionContext.getProcessDefineCode()).thenReturn(processDefineCode); + Mockito.when(taskExecutionContext.getProcessDefineVersion()).thenReturn(processDefineVersion); + Mockito.when(taskExecutionContext.getProcessInstanceId()).thenReturn(processInstanceId); + Mockito.when(taskExecutionContext.getTaskInstanceId()).thenReturn(taskInstanceId); + Mockito.when(taskExecutionContext.getTaskName()).thenReturn(taskName); + + String except = String.format("%s_%s", exceptTemplate, fileName); + Assertions.assertEquals(except, TaskFilesTransferUtils.getResourcePath(taskExecutionContext, fileName)); + + } + + @Test + void testGetVarPools() { + String varPoolsString = "[" + + "{\"prop\":\"input\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task1.output\"}" + + ",{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${a}\"}" + + "]"; + TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); + Mockito.when(taskExecutionContext.getVarPool()).thenReturn(varPoolsString); + + List varPools = TaskFilesTransferUtils.getVarPools(taskExecutionContext); + Assertions.assertEquals(2, varPools.size()); + + Property varPool0 = varPools.get(0); + Assertions.assertEquals("input", varPool0.getProp()); + Assertions.assertEquals(Direct.IN, varPool0.getDirect()); + Assertions.assertEquals(DataType.FILE, varPool0.getType()); + Assertions.assertEquals("task1.output", varPool0.getValue()); + + Property varPool1 = varPools.get(1); + Assertions.assertEquals("a", varPool1.getProp()); + Assertions.assertEquals(Direct.IN, varPool1.getDirect()); + Assertions.assertEquals(DataType.VARCHAR, varPool1.getType()); + Assertions.assertEquals("${a}", varPool1.getValue()); + + Mockito.when(taskExecutionContext.getVarPool()).thenReturn("[]"); + List varPoolsEmpty = TaskFilesTransferUtils.getVarPools(taskExecutionContext); + Assertions.assertEquals(0, varPoolsEmpty.size()); + + Mockito.when(taskExecutionContext.getVarPool()).thenReturn(null); + List varPoolsNull = TaskFilesTransferUtils.getVarPools(taskExecutionContext); + Assertions.assertEquals(0, varPoolsNull.size()); + + } + + @Test + void testPackIfDir() throws Exception { + File folderPath = Files.createTempDir(); + File file1 = new File(folderPath.getPath() + "/test.txt"); + File file2 = new File(folderPath.getPath() + "/test.zip"); + boolean isSuccess1 = file1.createNewFile(); + boolean isSuccess2 = file2.createNewFile(); + + Assertions.assertTrue(isSuccess1); + Assertions.assertTrue(isSuccess2); + + Assertions.assertEquals(file1.getPath(), TaskFilesTransferUtils.packIfDir(file1.getPath())); + Assertions.assertEquals(file2.getPath(), TaskFilesTransferUtils.packIfDir(file2.getPath())); + + String expectFolderPackPath = folderPath.getPath() + TaskFilesTransferUtils.PACK_SUFFIX; + Assertions.assertEquals(expectFolderPackPath, TaskFilesTransferUtils.packIfDir(folderPath.getPath())); + } +} diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 5309687598..73cc2d603b 100644 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -338,6 +338,7 @@ woodstox-core-5.3.0.jar xmlbeans-3.1.0.jar zeppelin-client-0.10.1.jar zeppelin-common-0.10.1.jar +zt-zip-1.15.jar grpc-api-1.41.0.jar grpc-context-1.41.0.jar grpc-core-1.41.0.jar