diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index e6400e79fa..eaca6707d4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Queue; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.Tenant; @@ -557,6 +558,28 @@ public class PythonGateway { return result; } + /** + * Get resource by given resource type and full name. It return map contain resource id, name. + * Useful in Python API create task which need processDefinition information. + * + * @param userName user who query resource + * @param fullName full name of the resource + */ + public Map queryResourcesFileInfo(String userName, String fullName) { + Map result = new HashMap<>(); + User user = usersService.queryUser(userName); + Result resourceResponse = resourceService.queryResource(user, fullName, null, ResourceType.FILE); + if (resourceResponse.getCode() != Status.SUCCESS.getCode()) { + String msg = String.format("Can not find valid resource by name %s", fullName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + Resource resource = (Resource) resourceResponse.getData(); + result.put("id", resource.getId()); + result.put("name", resource.getFullName()); + return result; + } + @PostConstruct public void init() { if (pythonGatewayConfiguration.getEnabled()) { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java index 7d8b6efabc..34ff2b6753 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java @@ -17,13 +17,20 @@ package org.apache.dolphinscheduler.api.python; +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.ResourcesService; +import org.apache.dolphinscheduler.api.service.UsersService; +import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -53,6 +60,12 @@ public class PythonGatewayTest { @Mock private TaskDefinitionMapper taskDefinitionMapper; + @Mock + private ResourcesService resourcesService; + + @Mock + private UsersService usersService; + @Test public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException { Project project = getTestProject(); @@ -83,6 +96,37 @@ public class PythonGatewayTest { Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode()); } + + @Test + public void testQueryResourcesFileInfo() { + User user = getTestUser(); + Mockito.when(usersService.queryUser(user.getUserName())).thenReturn(user); + + Result mockResult = new Result<>(); + mockResult.setCode(Status.SUCCESS.getCode()); + Resource resource = getTestResource(); + mockResult.setData(resource); + Mockito.when(resourcesService.queryResource(user, resource.getFullName(), null, ResourceType.FILE)).thenReturn(mockResult); + + Map result = pythonGateway.queryResourcesFileInfo(user.getUserName(), resource.getFullName()); + Assert.assertEquals((int) result.get("id"), resource.getId()); + } + + private Resource getTestResource() { + Resource resource = new Resource(); + resource.setId(1); + resource.setType(ResourceType.FILE); + resource.setFullName("/dev/test.py"); + return resource; + } + + private User getTestUser() { + User user = new User(); + user.setId(1); + user.setUserName("ut-user"); + return user; + } + private Project getTestProject() { Project project = new Project(); project.setName("ut-project"); diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 262469c88f..a5089ac165 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -100,3 +100,9 @@ class Time(str): FMT_STD_TIME = "%H:%M:%S" FMT_NO_COLON_TIME = "%H%M%S" + + +class ResourceKey(str): + """Constants for key of resource.""" + + ID = "id" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index cef01706df..5f34b6db60 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -405,6 +405,7 @@ class ProcessDefinition(Base): json.dumps(self.task_relation_json), json.dumps(self.task_definition_json), None, + None, ) return self._process_definition_code diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 08a1cdf7e8..90c0e89087 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -22,6 +22,7 @@ from typing import Dict, List, Optional, Sequence, Set, Tuple, Union from pydolphinscheduler.constants import ( Delimiter, + ResourceKey, TaskFlag, TaskPriority, TaskTimeoutFlag, @@ -155,7 +156,7 @@ class Task(Base): # Attribute for task param self.local_params = local_params or [] - self.resource_list = resource_list or [] + self._resource_list = resource_list or [] self.dependence = dependence or {} self.wait_start_timeout = wait_start_timeout or {} self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT @@ -170,6 +171,22 @@ class Task(Base): """Set attribute process_definition.""" self._process_definition = process_definition + @property + def resource_list(self) -> List: + """Get task define attribute `resource_list`.""" + resources = set() + for resource in self._resource_list: + if type(resource) == str: + resources.add(self.query_resource(resource).get(ResourceKey.ID)) + elif type(resource) == dict and resource.get(ResourceKey.ID) is not None: + logger.warning( + """`resource_list` should be defined using List[str] with resource paths, + the use of ids to define resources will be remove in version 3.2.0. + """ + ) + resources.add(resource.get(ResourceKey.ID)) + return [{ResourceKey.ID: r} for r in resources] + @property def condition_result(self) -> Dict: """Get attribute condition_result.""" @@ -278,3 +295,10 @@ class Task(Base): # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) # gateway_result_checker(result) return result.get("code"), result.get("version") + + def query_resource(self, full_name): + """Get resource info from java gateway, contains resource id, name.""" + gateway = launch_gateway() + return gateway.entry_point.queryResourcesFileInfo( + self.process_definition.user.name, full_name + ) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py index 7d4bbebdd3..65555c1eb5 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py @@ -54,7 +54,7 @@ TEST_TASK_RELATION_SIZE = 0 }, { "localParams": ["foo", "bar"], - "resourceList": ["foo", "bar"], + "resourceList": [{"id": 1}], "dependence": {"foo", "bar"}, "waitStartTimeout": {"foo", "bar"}, "conditionResult": {"foo": ["bar"]}, @@ -62,7 +62,11 @@ TEST_TASK_RELATION_SIZE = 0 ), ], ) -def test_property_task_params(attr, expect): +@patch( + "pydolphinscheduler.core.task.Task.query_resource", + return_value=({"id": 1, "name": "foo"}), +) +def test_property_task_params(mock_resource, attr, expect): """Test class task property.""" task = testTask( "test-property-task-params", @@ -241,3 +245,34 @@ def test_add_duplicate(caplog): re.findall("already in process definition", caplog.text), ] ) + + +@pytest.mark.parametrize( + "resources, expect", + [ + ( + ["/dev/test.py"], + [{"id": 1}], + ), + ( + ["/dev/test.py", {"id": 2}], + [{"id": 1}, {"id": 2}], + ), + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.core.task.Task.query_resource", + return_value=({"id": 1, "name": "/dev/test.py"}), +) +def test_python_resource_list(mock_code_version, mock_resource, resources, expect): + """Test python task resource list.""" + task = Task( + name="python_resource_list.", + task_type="PYTHON", + resource_list=resources, + ) + assert task.resource_list == expect