From 59cd86157ff8d5ab25f0b200d6a30abd95a64c03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=AE=B6=E5=90=8D?= <13774486042@163.com> Date: Tue, 12 Jul 2022 20:44:59 +0800 Subject: [PATCH] [Python] Supports creating or editing resources. (#10823) --- .../api/python/PythonGateway.java | 15 +++ .../api/service/ResourcesService.java | 24 +++++ .../service/impl/ResourcesServiceImpl.java | 92 +++++++++++++++++++ .../api/python/PythonGatewayTest.java | 20 ++++ .../api/service/ResourcesServiceTest.java | 60 +++++++++++- .../dolphinscheduler/common/Constants.java | 5 + .../core/process_definition.py | 14 +++ .../src/pydolphinscheduler/core/resource.py | 43 +++++++++ .../tests/core/test_process_definition.py | 9 +- .../tests/core/test_resource_definition.py | 38 ++++++++ 10 files changed, 318 insertions(+), 2 deletions(-) create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index fe74ff6018..ca8a107d9d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -547,6 +547,21 @@ public class PythonGateway { return result; } + /** + * create or update resource. + * If the folder is not already created, it will be + * + * @param userName user who create or update resource + * @param fullName The fullname of resource.Includes path and suffix. + * @param description description of resource + * @param resourceContent content of resource + * @return id of resource + */ + public Integer createOrUpdateResource( + String userName, String fullName, String description, String resourceContent) { + return resourceService.createOrUpdateResource(userName, fullName, description, resourceContent); + } + @PostConstruct public void init() { if (pythonGatewayConfiguration.getEnabled()) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index ddecb2cd99..fb0fee5e17 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -168,6 +168,30 @@ public interface ResourcesService { */ Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory); + /** + * create or update resource. + * If the folder is not already created, it will be + * + * @param loginUser user who create or update resource + * @param fileFullName The full name of resource.Includes path and suffix. + * @param desc description of resource + * @param content content of resource + * @return create result code + */ + Result onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc, String content); + + /** + * create or update resource. + * If the folder is not already created, it will be + * + * @param userName user who create or update resource + * @param fullName The fullname of resource.Includes path and suffix. + * @param description description of resource + * @param resourceContent content of resource + * @return id of resource + */ + Integer createOrUpdateResource(String userName, String fullName, String description, String resourceContent); + /** * updateProcessInstance resource * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index 79441bf858..2a5e83c096 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -85,10 +85,12 @@ import java.util.stream.Collectors; import static org.apache.dolphinscheduler.common.Constants.ALIAS; import static org.apache.dolphinscheduler.common.Constants.CONTENT; +import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING; import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR; import static org.apache.dolphinscheduler.common.Constants.FORMAT_SS; import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S; import static org.apache.dolphinscheduler.common.Constants.JAR; +import static org.apache.dolphinscheduler.common.Constants.PERIOD; /** * resources service impl @@ -1117,6 +1119,96 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe return result; } + /** + * create or update resource. + * If the folder is not already created, it will be + * + * @param loginUser user who create or update resource + * @param fileFullName The full name of resource.Includes path and suffix. + * @param desc description of resource + * @param content content of resource + * @return create result code + */ + @Override + @Transactional + public Result onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc, String content) { + if (checkResourceExists(fileFullName, ResourceType.FILE.ordinal())) { + Resource resource = resourcesMapper.queryResource(fileFullName, ResourceType.FILE.ordinal()).get(0); + Result result = this.updateResourceContent(loginUser, resource.getId(), content); + if (result.getCode() == Status.SUCCESS.getCode()) { + resource.setDescription(desc); + Map resultMap = new HashMap<>(); + for (Map.Entry entry : new BeanMap(resource).entrySet()) { + if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) { + resultMap.put(entry.getKey().toString(), entry.getValue()); + } + } + result.setData(resultMap); + } + return result; + } else { + String resourceSuffix = fileFullName.substring(fileFullName.indexOf(PERIOD) + 1); + String fileNameWithSuffix = fileFullName.substring(fileFullName.lastIndexOf(FOLDER_SEPARATOR) + 1); + String resourceDir = fileFullName.replace(fileNameWithSuffix, EMPTY_STRING); + String resourceName = fileNameWithSuffix.replace(PERIOD + resourceSuffix, EMPTY_STRING); + String[] dirNames = resourceDir.split(FOLDER_SEPARATOR); + int pid = -1; + StringBuilder currDirPath = new StringBuilder(); + for (String dirName : dirNames) { + if (StringUtils.isNotEmpty(dirName)) { + pid = queryOrCreateDirId(loginUser, pid, currDirPath.toString(), dirName); + currDirPath.append(FOLDER_SEPARATOR).append(dirName); + } + } + return this.onlineCreateResource( + loginUser, ResourceType.FILE, resourceName, resourceSuffix, desc, content, pid, currDirPath.toString()); + } + } + + @Override + @Transactional + public Integer createOrUpdateResource(String userName, String fullName, String description, String resourceContent) { + User user = userMapper.queryByUserNameAccurately(userName); + int suffixLabelIndex = fullName.indexOf(PERIOD); + if (suffixLabelIndex == -1) { + String msg = String.format("The suffix of file can not be empty : %s", fullName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + if (!fullName.startsWith(FOLDER_SEPARATOR)) { + fullName = FOLDER_SEPARATOR + fullName; + } + Result createResult = onlineCreateOrUpdateResourceWithDir( + user, fullName, description, resourceContent); + if (createResult.getCode() == Status.SUCCESS.getCode()) { + Map resultMap = (Map) createResult.getData(); + return (int) resultMap.get("id"); + } + String msg = String.format("Can not create or update resource : %s", fullName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + + private int queryOrCreateDirId(User user, int pid, String currentDir, String dirName) { + String dirFullName = currentDir + FOLDER_SEPARATOR + dirName; + if (checkResourceExists(dirFullName, ResourceType.FILE.ordinal())) { + List resourceList = resourcesMapper.queryResource(dirFullName, ResourceType.FILE.ordinal()); + return resourceList.get(0).getId(); + } else { + // create dir + Result createDirResult = this.createDirectory( + user, dirName, EMPTY_STRING, ResourceType.FILE, pid, currentDir); + if (createDirResult.getCode() == Status.SUCCESS.getCode()) { + Map resultMap = (Map) createDirResult.getData(); + return (int) resultMap.get("id"); + } else { + String msg = String.format("Can not create dir %s", dirFullName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + } + } + private void permissionPostHandle(ResourceType resourceType, User loginUser, Integer resourceId) { AuthorizationType authorizationType = resourceType.equals(ResourceType.FILE) ? AuthorizationType.RESOURCE_FILE_ID : AuthorizationType.UDF_FILE; permissionPostHandle(authorizationType, loginUser.getId(), Collections.singletonList(resourceId), logger); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java index 34ff2b6753..022eb344a5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java @@ -96,6 +96,26 @@ public class PythonGatewayTest { Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode()); } + @Test + public void testCreateResource() { + User user = getTestUser(); + String resourceDir = "/dir1/dir2/"; + String resourceName = "test"; + String resourceSuffix = "py"; + String desc = "desc"; + String content = "content"; + String resourceFullName = resourceDir + resourceName + "." + resourceSuffix; + + int resourceId = 3; + + Mockito.when(resourcesService.createOrUpdateResource(user.getUserName(), resourceFullName, desc, content)) + .thenReturn(resourceId); + + int id = pythonGateway.createOrUpdateResource( + user.getUserName(), resourceFullName, desc, content); + Assert.assertEquals(id, resourceId); + } + @Test public void testQueryResourcesFileInfo() { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index af431c150e..359126688f 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -605,6 +605,64 @@ public class ResourcesServiceTest { } + @Test + public void testOnlineCreateResourceWithDir() { + User user = getUser(); + user.setId(1); + + String dir1Path = "/dir1"; + String dir2Path = "/dir2"; + String resourceDir = dir1Path + dir2Path; + String resourceName = "test"; + String resourceSuffix = "py"; + String desc = "desc"; + String content = "content"; + String fullName = resourceDir + "/" + resourceName + "." + resourceSuffix; + + Resource dir1 = new Resource(); + dir1.setFullName(dir1Path); + dir1.setId(1); + dir1.setUserId(user.getId()); + Resource dir2 = new Resource(); + dir2.setFullName(resourceDir); + dir2.setUserId(user.getId()); + Mockito.when(resourcesMapper.queryResource(dir1.getFullName(), ResourceType.FILE.ordinal())).thenReturn(Collections.singletonList(dir1)); + Mockito.when(resourcesMapper.queryResource(resourceDir, ResourceType.FILE.ordinal())).thenReturn(null); + PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck( + AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_VIEW, serviceLogger)).thenReturn(true); + PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck( + AuthorizationType.RESOURCE_FILE_ID, new Object[]{dir1.getId()}, 1, serviceLogger)).thenReturn(true); + + Tenant tenant = getTenant(); + PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck( + AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FOLDER_ONLINE_CREATE, serviceLogger)).thenReturn(true); + PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck( + AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true); + try { + PowerMockito.when(storageOperate.mkdir(tenant.getTenantCode(), null)).thenReturn(true); + } catch (IOException e) { + logger.error("storage error", e); + } + + PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck( + AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_ONLINE_CREATE, serviceLogger)).thenReturn(true); + PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck( + AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true); + PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); + PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck( + AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_RENAME, serviceLogger)).thenReturn(true); + PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck( + AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true); + Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); + Mockito.when(resourcesMapper.selectById(dir1.getId())).thenReturn(dir1); + Mockito.when(resourcesMapper.selectById(dir2.getId())).thenReturn(dir2); + Mockito.when(FileUtils.getUploadFilename(Mockito.anyString(), Mockito.anyString())).thenReturn("test"); + PowerMockito.when(FileUtils.writeContent2File(Mockito.anyString(), Mockito.anyString())).thenReturn(true); + + Result result = resourcesService.onlineCreateOrUpdateResourceWithDir(user, fullName, desc, content); + Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg()); + } + @Test public void testUpdateResourceContent() { PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); @@ -896,7 +954,7 @@ public class ResourcesServiceTest { return resource; } - private Resource getResource(int resourceId,ResourceType type) { + private Resource getResource(int resourceId, ResourceType type) { Resource resource = new Resource(); resource.setId(resourceId); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 6f253d99a4..1ea6bc0d8f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -159,6 +159,11 @@ public final class Constants { */ public static final String COLON = ":"; + /** + * period . + */ + public static final String PERIOD = "."; + /** * QUESTION ? */ diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index 5f34b6db60..a8cf875785 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -63,6 +63,9 @@ class ProcessDefinition(Base): thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to ``user`` if it does not exists. And when ``project`` exists but project's create do not belongs to ``user``, will grant `project` to ``user`` automatically. + :param resource_list: Resource files required by the current process definition.You can create and modify + resource files from this field. When the process definition is submitted, these resource files are also + submitted along with it. """ # key attribute for identify ProcessDefinition object @@ -88,6 +91,7 @@ class ProcessDefinition(Base): "tasks", "task_definition_json", "task_relation_json", + "resource_list", } def __init__( @@ -107,6 +111,7 @@ class ProcessDefinition(Base): timeout: Optional[int] = 0, release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE, param: Optional[Dict] = None, + resource_list: Optional[List] = None, ): super().__init__(name, description) self.schedule = schedule @@ -132,6 +137,7 @@ class ProcessDefinition(Base): # TODO how to fix circle import self._task_relations: set["TaskRelation"] = set() # noqa: F821 self._process_definition_code = None + self.resource_list = resource_list or [] def __enter__(self) -> "ProcessDefinition": ProcessDefinitionContext.set(self) @@ -407,6 +413,14 @@ class ProcessDefinition(Base): None, None, ) + if len(self.resource_list) > 0: + for res in self.resource_list: + gateway.entry_point.createOrUpdateResource( + self._user, + res.name, + res.description, + res.content, + ) return self._process_definition_code def start(self) -> None: diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py new file mode 100644 index 0000000000..bd4ffd4bd7 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Module resource.""" + +from typing import Optional + +from pydolphinscheduler.core.base import Base + + +class Resource(Base): + """resource object, will define the resources that you want to create or update. + + :param name: The fullname of resource.Includes path and suffix. + :param content: The description of resource. + :param description: The description of resource. + """ + + _DEFINE_ATTR = {"name", "content", "description"} + + def __init__( + self, + name: str, + content: str, + description: Optional[str] = None, + ): + super().__init__(name, description) + self.content = content + self._resource_code = None diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index 63580de467..5cb6dabdc0 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -18,7 +18,7 @@ """Test process definition.""" from datetime import datetime -from typing import Any +from typing import Any, List from unittest.mock import patch import pytest @@ -26,6 +26,7 @@ from freezegun import freeze_time from pydolphinscheduler.core import configuration from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.core.resource import Resource from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.side import Project, Tenant, User from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition @@ -90,6 +91,11 @@ def test_process_definition_default_value(name, value): ("warning_group_id", int, 1), ("timeout", int, 1), ("param", dict, {"key": "value"}), + ( + "resource_list", + List, + [Resource(name="/dev/test.py", content="hello world", description="desc")], + ), ], ) def test_set_attr(name, cls, expect): @@ -321,6 +327,7 @@ def test_process_definition_get_define_without_task(): "tasks": {}, "taskDefinitionJson": [{}], "taskRelationJson": [{}], + "resourceList": [], } with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: assert pd.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py new file mode 100644 index 0000000000..ebfb8936f8 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test resource definition.""" + +from pydolphinscheduler.core.resource import Resource + + +def test_resource(): + """Test resource set attributes which get with same type.""" + name = "/dev/test.py" + content = """print("hello world")""" + description = "hello world" + expect = { + "name": name, + "content": content, + "description": description, + } + resourceDefinition = Resource( + name=name, + content=content, + description=description, + ) + assert resourceDefinition.get_define() == expect