Browse Source

[Python] Supports creating or editing resources. (#10823)

k8s/config
陈家名 2 years ago committed by GitHub
parent
commit
59cd86157f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  2. 24
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  3. 92
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  4. 20
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
  5. 60
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  6. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  7. 14
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  8. 43
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
  9. 9
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
  10. 38
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -547,6 +547,21 @@ public class PythonGateway {
return result;
}
/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param userName user who create or update resource
* @param fullName The fullname of resource.Includes path and suffix.
* @param description description of resource
* @param resourceContent content of resource
* @return id of resource
*/
public Integer createOrUpdateResource(
String userName, String fullName, String description, String resourceContent) {
return resourceService.createOrUpdateResource(userName, fullName, description, resourceContent);
}
@PostConstruct
public void init() {
if (pythonGatewayConfiguration.getEnabled()) {

24
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -168,6 +168,30 @@ public interface ResourcesService {
*/
Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory);
/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param loginUser user who create or update resource
* @param fileFullName The full name of resource.Includes path and suffix.
* @param desc description of resource
* @param content content of resource
* @return create result code
*/
Result<Object> onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc, String content);
/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param userName user who create or update resource
* @param fullName The fullname of resource.Includes path and suffix.
* @param description description of resource
* @param resourceContent content of resource
* @return id of resource
*/
Integer createOrUpdateResource(String userName, String fullName, String description, String resourceContent);
/**
* updateProcessInstance resource
*

92
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -85,10 +85,12 @@ import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.ALIAS;
import static org.apache.dolphinscheduler.common.Constants.CONTENT;
import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_SS;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.JAR;
import static org.apache.dolphinscheduler.common.Constants.PERIOD;
/**
* resources service impl
@ -1117,6 +1119,96 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param loginUser user who create or update resource
* @param fileFullName The full name of resource.Includes path and suffix.
* @param desc description of resource
* @param content content of resource
* @return create result code
*/
@Override
@Transactional
public Result<Object> onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc, String content) {
if (checkResourceExists(fileFullName, ResourceType.FILE.ordinal())) {
Resource resource = resourcesMapper.queryResource(fileFullName, ResourceType.FILE.ordinal()).get(0);
Result<Object> result = this.updateResourceContent(loginUser, resource.getId(), content);
if (result.getCode() == Status.SUCCESS.getCode()) {
resource.setDescription(desc);
Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<Object, Object> entry : new BeanMap(resource).entrySet()) {
if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
}
}
result.setData(resultMap);
}
return result;
} else {
String resourceSuffix = fileFullName.substring(fileFullName.indexOf(PERIOD) + 1);
String fileNameWithSuffix = fileFullName.substring(fileFullName.lastIndexOf(FOLDER_SEPARATOR) + 1);
String resourceDir = fileFullName.replace(fileNameWithSuffix, EMPTY_STRING);
String resourceName = fileNameWithSuffix.replace(PERIOD + resourceSuffix, EMPTY_STRING);
String[] dirNames = resourceDir.split(FOLDER_SEPARATOR);
int pid = -1;
StringBuilder currDirPath = new StringBuilder();
for (String dirName : dirNames) {
if (StringUtils.isNotEmpty(dirName)) {
pid = queryOrCreateDirId(loginUser, pid, currDirPath.toString(), dirName);
currDirPath.append(FOLDER_SEPARATOR).append(dirName);
}
}
return this.onlineCreateResource(
loginUser, ResourceType.FILE, resourceName, resourceSuffix, desc, content, pid, currDirPath.toString());
}
}
@Override
@Transactional
public Integer createOrUpdateResource(String userName, String fullName, String description, String resourceContent) {
User user = userMapper.queryByUserNameAccurately(userName);
int suffixLabelIndex = fullName.indexOf(PERIOD);
if (suffixLabelIndex == -1) {
String msg = String.format("The suffix of file can not be empty : %s", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
if (!fullName.startsWith(FOLDER_SEPARATOR)) {
fullName = FOLDER_SEPARATOR + fullName;
}
Result<Object> createResult = onlineCreateOrUpdateResourceWithDir(
user, fullName, description, resourceContent);
if (createResult.getCode() == Status.SUCCESS.getCode()) {
Map<String, Object> resultMap = (Map<String, Object>) createResult.getData();
return (int) resultMap.get("id");
}
String msg = String.format("Can not create or update resource : %s", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
private int queryOrCreateDirId(User user, int pid, String currentDir, String dirName) {
String dirFullName = currentDir + FOLDER_SEPARATOR + dirName;
if (checkResourceExists(dirFullName, ResourceType.FILE.ordinal())) {
List<Resource> resourceList = resourcesMapper.queryResource(dirFullName, ResourceType.FILE.ordinal());
return resourceList.get(0).getId();
} else {
// create dir
Result<Object> createDirResult = this.createDirectory(
user, dirName, EMPTY_STRING, ResourceType.FILE, pid, currentDir);
if (createDirResult.getCode() == Status.SUCCESS.getCode()) {
Map<String, Object> resultMap = (Map<String, Object>) createDirResult.getData();
return (int) resultMap.get("id");
} else {
String msg = String.format("Can not create dir %s", dirFullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
}
}
private void permissionPostHandle(ResourceType resourceType, User loginUser, Integer resourceId) {
AuthorizationType authorizationType = resourceType.equals(ResourceType.FILE) ? AuthorizationType.RESOURCE_FILE_ID : AuthorizationType.UDF_FILE;
permissionPostHandle(authorizationType, loginUser.getId(), Collections.singletonList(resourceId), logger);

20
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java

@ -96,6 +96,26 @@ public class PythonGatewayTest {
Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode());
}
@Test
public void testCreateResource() {
User user = getTestUser();
String resourceDir = "/dir1/dir2/";
String resourceName = "test";
String resourceSuffix = "py";
String desc = "desc";
String content = "content";
String resourceFullName = resourceDir + resourceName + "." + resourceSuffix;
int resourceId = 3;
Mockito.when(resourcesService.createOrUpdateResource(user.getUserName(), resourceFullName, desc, content))
.thenReturn(resourceId);
int id = pythonGateway.createOrUpdateResource(
user.getUserName(), resourceFullName, desc, content);
Assert.assertEquals(id, resourceId);
}
@Test
public void testQueryResourcesFileInfo() {

60
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -605,6 +605,64 @@ public class ResourcesServiceTest {
}
@Test
public void testOnlineCreateResourceWithDir() {
User user = getUser();
user.setId(1);
String dir1Path = "/dir1";
String dir2Path = "/dir2";
String resourceDir = dir1Path + dir2Path;
String resourceName = "test";
String resourceSuffix = "py";
String desc = "desc";
String content = "content";
String fullName = resourceDir + "/" + resourceName + "." + resourceSuffix;
Resource dir1 = new Resource();
dir1.setFullName(dir1Path);
dir1.setId(1);
dir1.setUserId(user.getId());
Resource dir2 = new Resource();
dir2.setFullName(resourceDir);
dir2.setUserId(user.getId());
Mockito.when(resourcesMapper.queryResource(dir1.getFullName(), ResourceType.FILE.ordinal())).thenReturn(Collections.singletonList(dir1));
Mockito.when(resourcesMapper.queryResource(resourceDir, ResourceType.FILE.ordinal())).thenReturn(null);
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_VIEW, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, new Object[]{dir1.getId()}, 1, serviceLogger)).thenReturn(true);
Tenant tenant = getTenant();
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FOLDER_ONLINE_CREATE, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true);
try {
PowerMockito.when(storageOperate.mkdir(tenant.getTenantCode(), null)).thenReturn(true);
} catch (IOException e) {
logger.error("storage error", e);
}
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_ONLINE_CREATE, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true);
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_RENAME, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true);
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(resourcesMapper.selectById(dir1.getId())).thenReturn(dir1);
Mockito.when(resourcesMapper.selectById(dir2.getId())).thenReturn(dir2);
Mockito.when(FileUtils.getUploadFilename(Mockito.anyString(), Mockito.anyString())).thenReturn("test");
PowerMockito.when(FileUtils.writeContent2File(Mockito.anyString(), Mockito.anyString())).thenReturn(true);
Result<Object> result = resourcesService.onlineCreateOrUpdateResourceWithDir(user, fullName, desc, content);
Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
@Test
public void testUpdateResourceContent() {
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
@ -896,7 +954,7 @@ public class ResourcesServiceTest {
return resource;
}
private Resource getResource(int resourceId,ResourceType type) {
private Resource getResource(int resourceId, ResourceType type) {
Resource resource = new Resource();
resource.setId(resourceId);

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -159,6 +159,11 @@ public final class Constants {
*/
public static final String COLON = ":";
/**
* period .
*/
public static final String PERIOD = ".";
/**
* QUESTION ?
*/

14
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -63,6 +63,9 @@ class ProcessDefinition(Base):
thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to
``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
to ``user``, will grant `project` to ``user`` automatically.
:param resource_list: Resource files required by the current process definition.You can create and modify
resource files from this field. When the process definition is submitted, these resource files are also
submitted along with it.
"""
# key attribute for identify ProcessDefinition object
@ -88,6 +91,7 @@ class ProcessDefinition(Base):
"tasks",
"task_definition_json",
"task_relation_json",
"resource_list",
}
def __init__(
@ -107,6 +111,7 @@ class ProcessDefinition(Base):
timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
resource_list: Optional[List] = None,
):
super().__init__(name, description)
self.schedule = schedule
@ -132,6 +137,7 @@ class ProcessDefinition(Base):
# TODO how to fix circle import
self._task_relations: set["TaskRelation"] = set() # noqa: F821
self._process_definition_code = None
self.resource_list = resource_list or []
def __enter__(self) -> "ProcessDefinition":
ProcessDefinitionContext.set(self)
@ -407,6 +413,14 @@ class ProcessDefinition(Base):
None,
None,
)
if len(self.resource_list) > 0:
for res in self.resource_list:
gateway.entry_point.createOrUpdateResource(
self._user,
res.name,
res.description,
res.content,
)
return self._process_definition_code
def start(self) -> None:

43
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py

@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Module resource."""
from typing import Optional
from pydolphinscheduler.core.base import Base
class Resource(Base):
"""resource object, will define the resources that you want to create or update.
:param name: The fullname of resource.Includes path and suffix.
:param content: The description of resource.
:param description: The description of resource.
"""
_DEFINE_ATTR = {"name", "content", "description"}
def __init__(
self,
name: str,
content: str,
description: Optional[str] = None,
):
super().__init__(name, description)
self.content = content
self._resource_code = None

9
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -18,7 +18,7 @@
"""Test process definition."""
from datetime import datetime
from typing import Any
from typing import Any, List
from unittest.mock import patch
import pytest
@ -26,6 +26,7 @@ from freezegun import freeze_time
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.side import Project, Tenant, User
from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
@ -90,6 +91,11 @@ def test_process_definition_default_value(name, value):
("warning_group_id", int, 1),
("timeout", int, 1),
("param", dict, {"key": "value"}),
(
"resource_list",
List,
[Resource(name="/dev/test.py", content="hello world", description="desc")],
),
],
)
def test_set_attr(name, cls, expect):
@ -321,6 +327,7 @@ def test_process_definition_get_define_without_task():
"tasks": {},
"taskDefinitionJson": [{}],
"taskRelationJson": [{}],
"resourceList": [],
}
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert pd.get_define() == expect

38
dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py

@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test resource definition."""
from pydolphinscheduler.core.resource import Resource
def test_resource():
"""Test resource set attributes which get with same type."""
name = "/dev/test.py"
content = """print("hello world")"""
description = "hello world"
expect = {
"name": name,
"content": content,
"description": description,
}
resourceDefinition = Resource(
name=name,
content=content,
description=description,
)
assert resourceDefinition.get_define() == expect
Loading…
Cancel
Save