Browse Source

[Python] Migrate exists method call in PythonGateway to corresponding service (#11188)

Migrate all exists method call in PythonGateway to corresponding service
k8s/config
陈家名 2 years ago committed by GitHub
parent
commit
aef2fbf36c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  2. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  3. 11
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  4. 18
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
  5. 18
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  6. 11
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  7. 36
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
  8. 29
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
  9. 36
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py
  10. 22
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -24,7 +24,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
@ -572,19 +571,8 @@ public class PythonGateway {
* @param userName user who query resource
* @param fullName full name of the resource
*/
public Map<String, Object> queryResourcesFileInfo(String userName, String fullName) {
Map<String, Object> result = new HashMap<>();
User user = usersService.queryUser(userName);
Result<Object> resourceResponse = resourceService.queryResource(user, fullName, null, ResourceType.FILE);
if (resourceResponse.getCode() != Status.SUCCESS.getCode()) {
String msg = String.format("Can not find valid resource by name %s", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
Resource resource = (Resource) resourceResponse.getData();
result.put("id", resource.getId());
result.put("name", resource.getFullName());
return result;
public Resource queryResourcesFileInfo(String userName, String fullName) {
return resourceService.queryResourcesFileInfo(userName, fullName);
}
/**

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
@ -219,6 +220,15 @@ public interface ResourcesService {
*/
Map<String, Object> authorizeResourceTree(User loginUser, Integer userId);
/**
* Get resource by given resource type and full name.
* Useful in Python API create task which need processDefinition information.
*
* @param userName user who query resource
* @param fullName full name of the resource
*/
Resource queryResourcesFileInfo(String userName, String fullName);
/**
* unauthorized file
*

11
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -1448,6 +1448,17 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
@Override
public Resource queryResourcesFileInfo(String userName, String fullName) {
User user = userMapper.queryByUserNameAccurately(userName);
Result<Object> resourceResponse = this.queryResource(user, fullName, null, ResourceType.FILE);
if (resourceResponse.getCode() != Status.SUCCESS.getCode()) {
String msg = String.format("Can not find valid resource by name %s", fullName);
throw new IllegalArgumentException(msg);
}
return (Resource) resourceResponse.getData();
}
/**
* unauthorized file
*

18
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java

@ -17,10 +17,7 @@
package org.apache.dolphinscheduler.api.python;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
@ -63,9 +60,6 @@ public class PythonGatewayTest {
@Mock
private ResourcesService resourcesService;
@Mock
private UsersService usersService;
@Test
public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException {
Project project = getTestProject();
@ -120,16 +114,10 @@ public class PythonGatewayTest {
@Test
public void testQueryResourcesFileInfo() {
User user = getTestUser();
Mockito.when(usersService.queryUser(user.getUserName())).thenReturn(user);
Result<Object> mockResult = new Result<>();
mockResult.setCode(Status.SUCCESS.getCode());
Resource resource = getTestResource();
mockResult.setData(resource);
Mockito.when(resourcesService.queryResource(user, resource.getFullName(), null, ResourceType.FILE)).thenReturn(mockResult);
Map<String, Object> result = pythonGateway.queryResourcesFileInfo(user.getUserName(), resource.getFullName());
Assert.assertEquals((int) result.get("id"), resource.getId());
Mockito.when(resourcesService.queryResourcesFileInfo(user.getUserName(), resource.getFullName())).thenReturn(resource);
Resource result = pythonGateway.queryResourcesFileInfo(user.getUserName(), resource.getFullName());
Assert.assertEquals(result.getId(), resource.getId());
}
private Resource getTestResource() {

18
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -716,6 +716,24 @@ public class ResourcesServiceTest {
Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
@Test
public void testQueryResourcesFileInfo() {
User user = getUser();
String userName = "test-user";
Mockito.when(userMapper.queryByUserNameAccurately(userName)).thenReturn(user);
Resource file = new Resource();
file.setFullName("/dir/file1.py");
file.setId(1);
Mockito.when(resourcesMapper.queryResource(file.getFullName(), ResourceType.FILE.ordinal()))
.thenReturn(Collections.singletonList(file));
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, null, user.getId(), ApiFuncIdentificationConstant.FILE_VIEW, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, new Object[]{file.getId()}, user.getId(), serviceLogger)).thenReturn(true);
Resource result = resourcesService.queryResourcesFileInfo(userName, file.getFullName());
Assert.assertEquals(file.getFullName(), result.getFullName());
}
@Test
public void testUpdateResourceContent() {
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);

11
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -23,6 +23,7 @@ from typing import Any, Dict, List, Optional, Set
from pydolphinscheduler import configuration
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.models import Base, Project, Tenant, User
@ -110,7 +111,7 @@ class ProcessDefinition(Base):
timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
resource_list: Optional[List] = None,
resource_list: Optional[List[Resource]] = None,
):
super().__init__(name, description)
self.schedule = schedule
@ -414,12 +415,8 @@ class ProcessDefinition(Base):
)
if len(self.resource_list) > 0:
for res in self.resource_list:
gateway.entry_point.createOrUpdateResource(
self._user,
res.name,
res.description,
res.content,
)
res.user_name = self._user
res.create_or_update_resource()
return self._process_definition_code
def start(self) -> None:

36
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py

@ -19,6 +19,8 @@
from typing import Optional
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.models import Base
@ -28,16 +30,46 @@ class Resource(Base):
:param name: The fullname of resource.Includes path and suffix.
:param content: The description of resource.
:param description: The description of resource.
:param user_name: The user name of resource.
"""
_DEFINE_ATTR = {"name", "content", "description"}
_DEFINE_ATTR = {"name", "content", "description", "user_name"}
def __init__(
self,
name: str,
content: str,
content: Optional[str] = None,
description: Optional[str] = None,
user_name: Optional[str] = None,
):
super().__init__(name, description)
self.content = content
self.user_name = user_name
self._resource_code = None
def get_info_from_database(self):
"""Get resource info from java gateway, contains resource id, name."""
if not self.user_name:
raise PyDSParamException(
"`user_name` is required when querying resources from python gate."
)
gateway = launch_gateway()
return gateway.entry_point.queryResourcesFileInfo(self.user_name, self.name)
def get_id_from_database(self):
"""Get resource id from java gateway."""
return self.get_info_from_database().getId()
def create_or_update_resource(self):
"""Create or update resource via java gateway."""
if not self.content or not self.user_name:
raise PyDSParamException(
"`user_name` and `content` are required when create or update resource from python gate."
)
gateway = launch_gateway()
gateway.entry_point.createOrUpdateResource(
self.user_name,
self.name,
self.description,
self.content,
)

29
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@ -32,6 +32,8 @@ from pydolphinscheduler.core.process_definition import (
ProcessDefinition,
ProcessDefinitionContext,
)
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.models import Base
@ -175,18 +177,28 @@ class Task(Base):
def resource_list(self) -> List:
"""Get task define attribute `resource_list`."""
resources = set()
for resource in self._resource_list:
if type(resource) == str:
resources.add(self.query_resource(resource).get(ResourceKey.ID))
elif type(resource) == dict and resource.get(ResourceKey.ID) is not None:
for res in self._resource_list:
if type(res) == str:
resources.add(
Resource(name=res, user_name=self.user_name).get_id_from_database()
)
elif type(res) == dict and res.get(ResourceKey.ID) is not None:
logger.warning(
"""`resource_list` should be defined using List[str] with resource paths,
the use of ids to define resources will be remove in version 3.2.0.
"""
)
resources.add(resource.get(ResourceKey.ID))
resources.add(res.get(ResourceKey.ID))
return [{ResourceKey.ID: r} for r in resources]
@property
def user_name(self) -> Optional[str]:
"""Return user name of process definition."""
if self.process_definition:
return self.process_definition.user.name
else:
raise PyDSParamException("`user_name` cannot be empty.")
@property
def condition_result(self) -> Dict:
"""Get attribute condition_result."""
@ -295,10 +307,3 @@ class Task(Base):
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)
return result.get("code"), result.get("version")
def query_resource(self, full_name):
"""Get resource info from java gateway, contains resource id, name."""
gateway = launch_gateway()
return gateway.entry_point.queryResourcesFileInfo(
self.process_definition.user.name, full_name
)

36
dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py

@ -16,8 +16,10 @@
# under the License.
"""Test resource definition."""
import pytest
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException
def test_resource():
@ -25,14 +27,42 @@ def test_resource():
name = "/dev/test.py"
content = """print("hello world")"""
description = "hello world"
user_name = "test_user"
expect = {
"name": name,
"content": content,
"description": description,
"userName": user_name,
}
resourceDefinition = Resource(
name=name,
content=content,
description=description,
name=name, content=content, description=description, user_name=user_name
)
assert resourceDefinition.get_define() == expect
def test_empty_user_name():
"""Tests for the exception get info from database when the user name is null."""
name = "/dev/test.py"
content = """print("hello world")"""
description = "hello world"
resourceDefinition = Resource(name=name, content=content, description=description)
with pytest.raises(
PyDSParamException,
match="`user_name` is required when querying resources from python gate.",
):
resourceDefinition.get_info_from_database()
def test_empty_content():
"""Tests for the exception create or update resource when the user name or content is empty."""
name = "/dev/test.py"
user_name = "test_user"
description = "hello world"
resourceDefinition = Resource(
name=name, description=description, user_name=user_name
)
with pytest.raises(
PyDSParamException,
match="`user_name` and `content` are required when create or update resource from python gate.",
):
resourceDefinition.create_or_update_resource()

22
dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py

@ -63,10 +63,14 @@ TEST_TASK_RELATION_SIZE = 0
],
)
@patch(
"pydolphinscheduler.core.task.Task.query_resource",
return_value=({"id": 1, "name": "foo"}),
"pydolphinscheduler.core.resource.Resource.get_id_from_database",
return_value=1,
)
def test_property_task_params(mock_resource, attr, expect):
@patch(
"pydolphinscheduler.core.task.Task.user_name",
return_value="test_user",
)
def test_property_task_params(mock_resource, mock_user_name, attr, expect):
"""Test class task property."""
task = testTask(
"test-property-task-params",
@ -265,10 +269,16 @@ def test_add_duplicate(caplog):
return_value=(123, 1),
)
@patch(
"pydolphinscheduler.core.task.Task.query_resource",
return_value=({"id": 1, "name": "/dev/test.py"}),
"pydolphinscheduler.core.resource.Resource.get_id_from_database",
return_value=1,
)
@patch(
"pydolphinscheduler.core.task.Task.user_name",
return_value="test_user",
)
def test_python_resource_list(mock_code_version, mock_resource, resources, expect):
def test_python_resource_list(
mock_code_version, mock_resource, mock_user_name, resources, expect
):
"""Test python task resource list."""
task = Task(
name="python_resource_list.",

Loading…
Cancel
Save