diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index 57e289137a..d0489cce62 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -24,7 +24,6 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -572,19 +571,8 @@ public class PythonGateway { * @param userName user who query resource * @param fullName full name of the resource */ - public Map queryResourcesFileInfo(String userName, String fullName) { - Map result = new HashMap<>(); - User user = usersService.queryUser(userName); - Result resourceResponse = resourceService.queryResource(user, fullName, null, ResourceType.FILE); - if (resourceResponse.getCode() != Status.SUCCESS.getCode()) { - String msg = String.format("Can not find valid resource by name %s", fullName); - logger.error(msg); - throw new IllegalArgumentException(msg); - } - Resource resource = (Resource) resourceResponse.getData(); - result.put("id", resource.getId()); - result.put("name", resource.getFullName()); - return result; + public Resource queryResourcesFileInfo(String userName, String fullName) { + return resourceService.queryResourcesFileInfo(userName, fullName); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index fb0fee5e17..00c6fa151b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.ProgramType; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.spi.enums.ResourceType; @@ -219,6 +220,15 @@ public interface ResourcesService { */ Map authorizeResourceTree(User loginUser, Integer userId); + /** + * Get resource by given resource type and full name. + * Useful in Python API create task which need processDefinition information. + * + * @param userName user who query resource + * @param fullName full name of the resource + */ + Resource queryResourcesFileInfo(String userName, String fullName); + /** * unauthorized file * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index 55fd2f275f..16e230a705 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -1448,6 +1448,17 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe return result; } + @Override + public Resource queryResourcesFileInfo(String userName, String fullName) { + User user = userMapper.queryByUserNameAccurately(userName); + Result resourceResponse = this.queryResource(user, fullName, null, ResourceType.FILE); + if (resourceResponse.getCode() != Status.SUCCESS.getCode()) { + String msg = String.format("Can not find valid resource by name %s", fullName); + throw new IllegalArgumentException(msg); + } + return (Resource) resourceResponse.getData(); + } + /** * unauthorized file * diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java index 022eb344a5..2a49ed307b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java @@ -17,10 +17,7 @@ package org.apache.dolphinscheduler.api.python; -import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ResourcesService; -import org.apache.dolphinscheduler.api.service.UsersService; -import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Project; @@ -63,9 +60,6 @@ public class PythonGatewayTest { @Mock private ResourcesService resourcesService; - @Mock - private UsersService usersService; - @Test public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException { Project project = getTestProject(); @@ -120,16 +114,10 @@ public class PythonGatewayTest { @Test public void testQueryResourcesFileInfo() { User user = getTestUser(); - Mockito.when(usersService.queryUser(user.getUserName())).thenReturn(user); - - Result mockResult = new Result<>(); - mockResult.setCode(Status.SUCCESS.getCode()); Resource resource = getTestResource(); - mockResult.setData(resource); - Mockito.when(resourcesService.queryResource(user, resource.getFullName(), null, ResourceType.FILE)).thenReturn(mockResult); - - Map result = pythonGateway.queryResourcesFileInfo(user.getUserName(), resource.getFullName()); - Assert.assertEquals((int) result.get("id"), resource.getId()); + Mockito.when(resourcesService.queryResourcesFileInfo(user.getUserName(), resource.getFullName())).thenReturn(resource); + Resource result = pythonGateway.queryResourcesFileInfo(user.getUserName(), resource.getFullName()); + Assert.assertEquals(result.getId(), resource.getId()); } private Resource getTestResource() { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index 961d5f7763..1579210064 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -716,6 +716,24 @@ public class ResourcesServiceTest { Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg()); } + @Test + public void testQueryResourcesFileInfo() { + User user = getUser(); + String userName = "test-user"; + Mockito.when(userMapper.queryByUserNameAccurately(userName)).thenReturn(user); + Resource file = new Resource(); + file.setFullName("/dir/file1.py"); + file.setId(1); + Mockito.when(resourcesMapper.queryResource(file.getFullName(), ResourceType.FILE.ordinal())) + .thenReturn(Collections.singletonList(file)); + PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck( + AuthorizationType.RESOURCE_FILE_ID, null, user.getId(), ApiFuncIdentificationConstant.FILE_VIEW, serviceLogger)).thenReturn(true); + PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck( + AuthorizationType.RESOURCE_FILE_ID, new Object[]{file.getId()}, user.getId(), serviceLogger)).thenReturn(true); + Resource result = resourcesService.queryResourcesFileInfo(userName, file.getFullName()); + Assert.assertEquals(file.getFullName(), result.getFullName()); + } + @Test public void testUpdateResourceContent() { PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index 31d1e6124b..1740beafdf 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -23,6 +23,7 @@ from typing import Any, Dict, List, Optional, Set from pydolphinscheduler import configuration from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.resource import Resource from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.models import Base, Project, Tenant, User @@ -110,7 +111,7 @@ class ProcessDefinition(Base): timeout: Optional[int] = 0, release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE, param: Optional[Dict] = None, - resource_list: Optional[List] = None, + resource_list: Optional[List[Resource]] = None, ): super().__init__(name, description) self.schedule = schedule @@ -414,12 +415,8 @@ class ProcessDefinition(Base): ) if len(self.resource_list) > 0: for res in self.resource_list: - gateway.entry_point.createOrUpdateResource( - self._user, - res.name, - res.description, - res.content, - ) + res.user_name = self._user + res.create_or_update_resource() return self._process_definition_code def start(self) -> None: diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py index a3aab81d17..8015a72e39 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py @@ -19,6 +19,8 @@ from typing import Optional +from pydolphinscheduler.exceptions import PyDSParamException +from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.models import Base @@ -28,16 +30,46 @@ class Resource(Base): :param name: The fullname of resource.Includes path and suffix. :param content: The description of resource. :param description: The description of resource. + :param user_name: The user name of resource. """ - _DEFINE_ATTR = {"name", "content", "description"} + _DEFINE_ATTR = {"name", "content", "description", "user_name"} def __init__( self, name: str, - content: str, + content: Optional[str] = None, description: Optional[str] = None, + user_name: Optional[str] = None, ): super().__init__(name, description) self.content = content + self.user_name = user_name self._resource_code = None + + def get_info_from_database(self): + """Get resource info from java gateway, contains resource id, name.""" + if not self.user_name: + raise PyDSParamException( + "`user_name` is required when querying resources from python gate." + ) + gateway = launch_gateway() + return gateway.entry_point.queryResourcesFileInfo(self.user_name, self.name) + + def get_id_from_database(self): + """Get resource id from java gateway.""" + return self.get_info_from_database().getId() + + def create_or_update_resource(self): + """Create or update resource via java gateway.""" + if not self.content or not self.user_name: + raise PyDSParamException( + "`user_name` and `content` are required when create or update resource from python gate." + ) + gateway = launch_gateway() + gateway.entry_point.createOrUpdateResource( + self.user_name, + self.name, + self.description, + self.content, + ) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 4d4e67e323..b866ea221f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -32,6 +32,8 @@ from pydolphinscheduler.core.process_definition import ( ProcessDefinition, ProcessDefinitionContext, ) +from pydolphinscheduler.core.resource import Resource +from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.models import Base @@ -175,18 +177,28 @@ class Task(Base): def resource_list(self) -> List: """Get task define attribute `resource_list`.""" resources = set() - for resource in self._resource_list: - if type(resource) == str: - resources.add(self.query_resource(resource).get(ResourceKey.ID)) - elif type(resource) == dict and resource.get(ResourceKey.ID) is not None: + for res in self._resource_list: + if type(res) == str: + resources.add( + Resource(name=res, user_name=self.user_name).get_id_from_database() + ) + elif type(res) == dict and res.get(ResourceKey.ID) is not None: logger.warning( """`resource_list` should be defined using List[str] with resource paths, the use of ids to define resources will be remove in version 3.2.0. """ ) - resources.add(resource.get(ResourceKey.ID)) + resources.add(res.get(ResourceKey.ID)) return [{ResourceKey.ID: r} for r in resources] + @property + def user_name(self) -> Optional[str]: + """Return user name of process definition.""" + if self.process_definition: + return self.process_definition.user.name + else: + raise PyDSParamException("`user_name` cannot be empty.") + @property def condition_result(self) -> Dict: """Get attribute condition_result.""" @@ -295,10 +307,3 @@ class Task(Base): # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) # gateway_result_checker(result) return result.get("code"), result.get("version") - - def query_resource(self, full_name): - """Get resource info from java gateway, contains resource id, name.""" - gateway = launch_gateway() - return gateway.entry_point.queryResourcesFileInfo( - self.process_definition.user.name, full_name - ) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py index ebfb8936f8..07fcac3547 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py @@ -16,8 +16,10 @@ # under the License. """Test resource definition.""" +import pytest from pydolphinscheduler.core.resource import Resource +from pydolphinscheduler.exceptions import PyDSParamException def test_resource(): @@ -25,14 +27,42 @@ def test_resource(): name = "/dev/test.py" content = """print("hello world")""" description = "hello world" + user_name = "test_user" expect = { "name": name, "content": content, "description": description, + "userName": user_name, } resourceDefinition = Resource( - name=name, - content=content, - description=description, + name=name, content=content, description=description, user_name=user_name ) assert resourceDefinition.get_define() == expect + + +def test_empty_user_name(): + """Tests for the exception get info from database when the user name is null.""" + name = "/dev/test.py" + content = """print("hello world")""" + description = "hello world" + resourceDefinition = Resource(name=name, content=content, description=description) + with pytest.raises( + PyDSParamException, + match="`user_name` is required when querying resources from python gate.", + ): + resourceDefinition.get_info_from_database() + + +def test_empty_content(): + """Tests for the exception create or update resource when the user name or content is empty.""" + name = "/dev/test.py" + user_name = "test_user" + description = "hello world" + resourceDefinition = Resource( + name=name, description=description, user_name=user_name + ) + with pytest.raises( + PyDSParamException, + match="`user_name` and `content` are required when create or update resource from python gate.", + ): + resourceDefinition.create_or_update_resource() diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py index 65555c1eb5..201bdb30cd 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py @@ -63,10 +63,14 @@ TEST_TASK_RELATION_SIZE = 0 ], ) @patch( - "pydolphinscheduler.core.task.Task.query_resource", - return_value=({"id": 1, "name": "foo"}), + "pydolphinscheduler.core.resource.Resource.get_id_from_database", + return_value=1, ) -def test_property_task_params(mock_resource, attr, expect): +@patch( + "pydolphinscheduler.core.task.Task.user_name", + return_value="test_user", +) +def test_property_task_params(mock_resource, mock_user_name, attr, expect): """Test class task property.""" task = testTask( "test-property-task-params", @@ -265,10 +269,16 @@ def test_add_duplicate(caplog): return_value=(123, 1), ) @patch( - "pydolphinscheduler.core.task.Task.query_resource", - return_value=({"id": 1, "name": "/dev/test.py"}), + "pydolphinscheduler.core.resource.Resource.get_id_from_database", + return_value=1, +) +@patch( + "pydolphinscheduler.core.task.Task.user_name", + return_value="test_user", ) -def test_python_resource_list(mock_code_version, mock_resource, resources, expect): +def test_python_resource_list( + mock_code_version, mock_resource, mock_user_name, resources, expect +): """Test python task resource list.""" task = Task( name="python_resource_list.",