Browse Source

[Python] Supports creating or editing resources. (#10823)

(cherry picked from commit 59cd86157f)
3.0.0/version-upgrade
陈家名 2 years ago committed by Jiajie Zhong
parent
commit
4da1240b0e
  1. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  2. 24
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  3. 125
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  4. 20
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
  5. 60
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  6. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  7. 43
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
  8. 9
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
  9. 38
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -549,6 +549,21 @@ public class PythonGateway {
return result; return result;
} }
/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param userName user who create or update resource
* @param fullName The fullname of resource.Includes path and suffix.
* @param description description of resource
* @param resourceContent content of resource
* @return id of resource
*/
public Integer createOrUpdateResource(
String userName, String fullName, String description, String resourceContent) {
return resourceService.createOrUpdateResource(userName, fullName, description, resourceContent);
}
@PostConstruct @PostConstruct
public void init() { public void init() {
if (pythonGatewayConfiguration.getEnabled()) { if (pythonGatewayConfiguration.getEnabled()) {

24
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -168,6 +168,30 @@ public interface ResourcesService {
*/ */
Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory); Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory);
/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param loginUser user who create or update resource
* @param fileFullName The full name of resource.Includes path and suffix.
* @param desc description of resource
* @param content content of resource
* @return create result code
*/
Result<Object> onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc, String content);
/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param userName user who create or update resource
* @param fullName The fullname of resource.Includes path and suffix.
* @param description description of resource
* @param resourceContent content of resource
* @return id of resource
*/
Integer createOrUpdateResource(String userName, String fullName, String description, String resourceContent);
/** /**
* updateProcessInstance resource * updateProcessInstance resource
* *

125
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -66,16 +66,28 @@ import org.springframework.web.multipart.MultipartFile;
import java.io.IOException; import java.io.IOException;
import java.rmi.ServerException; import java.rmi.ServerException;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.*; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.ALIAS; import static org.apache.dolphinscheduler.common.Constants.ALIAS;
import static org.apache.dolphinscheduler.common.Constants.CONTENT; import static org.apache.dolphinscheduler.common.Constants.CONTENT;
import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR; import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_SS; import static org.apache.dolphinscheduler.common.Constants.FORMAT_SS;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S; import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.JAR; import static org.apache.dolphinscheduler.common.Constants.JAR;
import static org.apache.dolphinscheduler.common.Constants.PERIOD;
/** /**
* resources service impl * resources service impl
@ -118,7 +130,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
* @return create directory result * @return create directory result
*/ */
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional
public Result<Object> createDirectory(User loginUser, public Result<Object> createDirectory(User loginUser,
String name, String name,
String description, String description,
@ -189,7 +201,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
* @return create result code * @return create result code
*/ */
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional
public Result<Object> createResource(User loginUser, public Result<Object> createResource(User loginUser,
String name, String name,
String desc, String desc,
@ -207,6 +219,12 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result; return result;
} }
// make sure login user has tenant
String tenantCode = getTenantCode(loginUser.getId(), result);
if (StringUtils.isEmpty(tenantCode)) {
return result;
}
result = verifyFile(name, type, file); result = verifyFile(name, type, file);
if (!result.getCode().equals(Status.SUCCESS.getCode())) { if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result; return result;
@ -304,7 +322,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
* @return update result code * @return update result code
*/ */
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional
public Result<Object> updateResource(User loginUser, public Result<Object> updateResource(User loginUser,
int resourceId, int resourceId,
String name, String name,
@ -733,7 +751,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
* @throws IOException exception * @throws IOException exception
*/ */
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional
public Result<Object> delete(User loginUser, int resourceId) throws IOException { public Result<Object> delete(User loginUser, int resourceId) throws IOException {
Result<Object> result = checkResourceUploadStartupState(); Result<Object> result = checkResourceUploadStartupState();
if (!result.getCode().equals(Status.SUCCESS.getCode())) { if (!result.getCode().equals(Status.SUCCESS.getCode())) {
@ -980,7 +998,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
* @return create result code * @return create result code
*/ */
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional
public Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content, int pid, String currentDir) { public Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content, int pid, String currentDir) {
Result<Object> result = checkResourceUploadStartupState(); Result<Object> result = checkResourceUploadStartupState();
if (!result.getCode().equals(Status.SUCCESS.getCode())) { if (!result.getCode().equals(Status.SUCCESS.getCode())) {
@ -1035,6 +1053,101 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result; return result;
} }
/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param loginUser user who create or update resource
* @param fileFullName The full name of resource.Includes path and suffix.
* @param desc description of resource
* @param content content of resource
* @return create result code
*/
@Override
@Transactional
public Result<Object> onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc, String content) {
if (checkResourceExists(fileFullName, ResourceType.FILE.ordinal())) {
Resource resource = resourcesMapper.queryResource(fileFullName, ResourceType.FILE.ordinal()).get(0);
Result<Object> result = this.updateResourceContent(loginUser, resource.getId(), content);
if (result.getCode() == Status.SUCCESS.getCode()) {
resource.setDescription(desc);
Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<Object, Object> entry : new BeanMap(resource).entrySet()) {
if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
}
}
result.setData(resultMap);
}
return result;
} else {
String resourceSuffix = fileFullName.substring(fileFullName.indexOf(PERIOD) + 1);
String fileNameWithSuffix = fileFullName.substring(fileFullName.lastIndexOf(FOLDER_SEPARATOR) + 1);
String resourceDir = fileFullName.replace(fileNameWithSuffix, EMPTY_STRING);
String resourceName = fileNameWithSuffix.replace(PERIOD + resourceSuffix, EMPTY_STRING);
String[] dirNames = resourceDir.split(FOLDER_SEPARATOR);
int pid = -1;
StringBuilder currDirPath = new StringBuilder();
for (String dirName : dirNames) {
if (StringUtils.isNotEmpty(dirName)) {
pid = queryOrCreateDirId(loginUser, pid, currDirPath.toString(), dirName);
currDirPath.append(FOLDER_SEPARATOR).append(dirName);
}
}
return this.onlineCreateResource(
loginUser, ResourceType.FILE, resourceName, resourceSuffix, desc, content, pid, currDirPath.toString());
}
}
@Override
@Transactional
public Integer createOrUpdateResource(String userName, String fullName, String description, String resourceContent) {
User user = userMapper.queryByUserNameAccurately(userName);
int suffixLabelIndex = fullName.indexOf(PERIOD);
if (suffixLabelIndex == -1) {
String msg = String.format("The suffix of file can not be empty : %s", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
if (!fullName.startsWith(FOLDER_SEPARATOR)) {
fullName = FOLDER_SEPARATOR + fullName;
}
Result<Object> createResult = onlineCreateOrUpdateResourceWithDir(
user, fullName, description, resourceContent);
if (createResult.getCode() == Status.SUCCESS.getCode()) {
Map<String, Object> resultMap = (Map<String, Object>) createResult.getData();
return (int) resultMap.get("id");
}
String msg = String.format("Can not create or update resource : %s", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
private int queryOrCreateDirId(User user, int pid, String currentDir, String dirName) {
String dirFullName = currentDir + FOLDER_SEPARATOR + dirName;
if (checkResourceExists(dirFullName, ResourceType.FILE.ordinal())) {
List<Resource> resourceList = resourcesMapper.queryResource(dirFullName, ResourceType.FILE.ordinal());
return resourceList.get(0).getId();
} else {
// create dir
Result<Object> createDirResult = this.createDirectory(
user, dirName, EMPTY_STRING, ResourceType.FILE, pid, currentDir);
if (createDirResult.getCode() == Status.SUCCESS.getCode()) {
Map<String, Object> resultMap = (Map<String, Object>) createDirResult.getData();
return (int) resultMap.get("id");
} else {
String msg = String.format("Can not create dir %s", dirFullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
}
}
private void permissionPostHandle(ResourceType resourceType, User loginUser, Integer resourceId) {
AuthorizationType authorizationType = resourceType.equals(ResourceType.FILE) ? AuthorizationType.RESOURCE_FILE_ID : AuthorizationType.UDF_FILE;
permissionPostHandle(authorizationType, loginUser.getId(), Collections.singletonList(resourceId), logger);
}
private Result<Object> checkResourceUploadStartupState() { private Result<Object> checkResourceUploadStartupState() {
Result<Object> result = new Result<>(); Result<Object> result = new Result<>();
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);

20
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java

@ -96,6 +96,26 @@ public class PythonGatewayTest {
Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode()); Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode());
} }
@Test
public void testCreateResource() {
User user = getTestUser();
String resourceDir = "/dir1/dir2/";
String resourceName = "test";
String resourceSuffix = "py";
String desc = "desc";
String content = "content";
String resourceFullName = resourceDir + resourceName + "." + resourceSuffix;
int resourceId = 3;
Mockito.when(resourcesService.createOrUpdateResource(user.getUserName(), resourceFullName, desc, content))
.thenReturn(resourceId);
int id = pythonGateway.createOrUpdateResource(
user.getUserName(), resourceFullName, desc, content);
Assert.assertEquals(id, resourceId);
}
@Test @Test
public void testQueryResourcesFileInfo() { public void testQueryResourcesFileInfo() {

60
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -511,6 +511,64 @@ public class ResourcesServiceTest {
} }
@Test
public void testOnlineCreateResourceWithDir() {
User user = getUser();
user.setId(1);
String dir1Path = "/dir1";
String dir2Path = "/dir2";
String resourceDir = dir1Path + dir2Path;
String resourceName = "test";
String resourceSuffix = "py";
String desc = "desc";
String content = "content";
String fullName = resourceDir + "/" + resourceName + "." + resourceSuffix;
Resource dir1 = new Resource();
dir1.setFullName(dir1Path);
dir1.setId(1);
dir1.setUserId(user.getId());
Resource dir2 = new Resource();
dir2.setFullName(resourceDir);
dir2.setUserId(user.getId());
Mockito.when(resourcesMapper.queryResource(dir1.getFullName(), ResourceType.FILE.ordinal())).thenReturn(Collections.singletonList(dir1));
Mockito.when(resourcesMapper.queryResource(resourceDir, ResourceType.FILE.ordinal())).thenReturn(null);
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_VIEW, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, new Object[]{dir1.getId()}, 1, serviceLogger)).thenReturn(true);
Tenant tenant = getTenant();
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FOLDER_ONLINE_CREATE, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true);
try {
PowerMockito.when(storageOperate.mkdir(tenant.getTenantCode(), null)).thenReturn(true);
} catch (IOException e) {
logger.error("storage error", e);
}
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_ONLINE_CREATE, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true);
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, 1, ApiFuncIdentificationConstant.FILE_RENAME, serviceLogger)).thenReturn(true);
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
AuthorizationType.RESOURCE_FILE_ID, null, 1, serviceLogger)).thenReturn(true);
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(resourcesMapper.selectById(dir1.getId())).thenReturn(dir1);
Mockito.when(resourcesMapper.selectById(dir2.getId())).thenReturn(dir2);
Mockito.when(FileUtils.getUploadFilename(Mockito.anyString(), Mockito.anyString())).thenReturn("test");
PowerMockito.when(FileUtils.writeContent2File(Mockito.anyString(), Mockito.anyString())).thenReturn(true);
Result<Object> result = resourcesService.onlineCreateOrUpdateResourceWithDir(user, fullName, desc, content);
Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
@Test @Test
public void testUpdateResourceContent() { public void testUpdateResourceContent() {
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
@ -780,7 +838,7 @@ public class ResourcesServiceTest {
return resource; return resource;
} }
private Resource getResource(int resourceId,ResourceType type) { private Resource getResource(int resourceId, ResourceType type) {
Resource resource = new Resource(); Resource resource = new Resource();
resource.setId(resourceId); resource.setId(resourceId);

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -161,6 +161,11 @@ public final class Constants {
*/ */
public static final String COLON = ":"; public static final String COLON = ":";
/**
* period .
*/
public static final String PERIOD = ".";
/** /**
* QUESTION ? * QUESTION ?
*/ */

43
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py

@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Module resource."""
from typing import Optional
from pydolphinscheduler.core.base import Base
class Resource(Base):
"""resource object, will define the resources that you want to create or update.
:param name: The fullname of resource.Includes path and suffix.
:param content: The description of resource.
:param description: The description of resource.
"""
_DEFINE_ATTR = {"name", "content", "description"}
def __init__(
self,
name: str,
content: str,
description: Optional[str] = None,
):
super().__init__(name, description)
self.content = content
self._resource_code = None

9
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -18,7 +18,7 @@
"""Test process definition.""" """Test process definition."""
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any, List
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
@ -26,6 +26,7 @@ from freezegun import freeze_time
from pydolphinscheduler.core import configuration from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.side import Project, Tenant, User from pydolphinscheduler.side import Project, Tenant, User
from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
@ -90,6 +91,11 @@ def test_process_definition_default_value(name, value):
("warning_group_id", int, 1), ("warning_group_id", int, 1),
("timeout", int, 1), ("timeout", int, 1),
("param", dict, {"key": "value"}), ("param", dict, {"key": "value"}),
(
"resource_list",
List,
[Resource(name="/dev/test.py", content="hello world", description="desc")],
),
], ],
) )
def test_set_attr(name, cls, expect): def test_set_attr(name, cls, expect):
@ -321,6 +327,7 @@ def test_process_definition_get_define_without_task():
"tasks": {}, "tasks": {},
"taskDefinitionJson": [{}], "taskDefinitionJson": [{}],
"taskRelationJson": [{}], "taskRelationJson": [{}],
"resourceList": [],
} }
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert pd.get_define() == expect assert pd.get_define() == expect

38
dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py

@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test resource definition."""
from pydolphinscheduler.core.resource import Resource
def test_resource():
"""Test resource set attributes which get with same type."""
name = "/dev/test.py"
content = """print("hello world")"""
description = "hello world"
expect = {
"name": name,
"content": content,
"description": description,
}
resourceDefinition = Resource(
name=name,
content=content,
description=description,
)
assert resourceDefinition.get_define() == expect
Loading…
Cancel
Save