Browse Source

[fix] Python gateway can not upload to resource center (#13042)

in #12076 we refactor our resource center, remove the resource table,
it is a good refactor but it failed python api upload, this patch try
to fix python api upload function

ref: #12076
3.2.0-release
Jay Chung 2 years ago committed by GitHub
parent
commit
f5a59982b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  2. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  3. 116
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  4. 38
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
  5. 74
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  6. 10
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UserMapper.xml

17
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -48,7 +48,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@ -59,6 +58,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.storage.StorageEntity;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import py4j.GatewayServer;
@ -632,9 +632,10 @@ public class PythonGateway {
*
* @param userName user who query resource
* @param fullName full name of the resource
* @return StorageEntity object which contains necessary information about resource
*/
public Resource queryResourcesFileInfo(String userName, String fullName) {
return resourceService.queryResourcesFileInfo(userName, fullName);
public StorageEntity queryResourcesFileInfo(String userName, String fullName) throws Exception {
return resourceService.queryFileStatus(userName, fullName);
}
public String getGatewayVersion() {
@ -647,14 +648,12 @@ public class PythonGateway {
*
* @param userName user who create or update resource
* @param fullName The fullname of resource.Includes path and suffix.
* @param description description of resource
* @param resourceContent content of resource
* @return id of resource
* @return StorageEntity object which contains necessary information about resource
*/
public Integer createOrUpdateResource(
String userName, String fullName, String description,
String resourceContent) {
return resourceService.createOrUpdateResource(userName, fullName, description, resourceContent);
public StorageEntity createOrUpdateResource(String userName, String fullName,
String resourceContent) throws Exception {
return resourceService.createOrUpdateResource(userName, fullName, resourceContent);
}
@PostConstruct

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -20,8 +20,8 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.storage.StorageEntity;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.io.IOException;
@ -170,28 +170,13 @@ public interface ResourcesService {
/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param loginUser user who create or update resource
* @param fileFullName The full name of resource.Includes path and suffix.
* @param desc description of resource
* @param content content of resource
* @return create result code
*/
Result<Object> onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc,
String content);
/**
* create or update resource.
* If the folder is not already created, it will be
* If the folder is not already created, it will be ignored and directly create the new file
*
* @param userName user who create or update resource
* @param fullName The fullname of resource.Includes path and suffix.
* @param description description of resource
* @param resourceContent content of resource
* @return id of resource
*/
Integer createOrUpdateResource(String userName, String fullName, String description, String resourceContent);
StorageEntity createOrUpdateResource(String userName, String fullName, String resourceContent) throws Exception;
/**
* updateProcessInstance resource
@ -229,7 +214,7 @@ public interface ResourcesService {
* @param userName user who query resource
* @param fullName full name of the resource
*/
Resource queryResourcesFileInfo(String userName, String fullName);
StorageEntity queryFileStatus(String userName, String fullName) throws Exception;
/**
* delete DATA_TRANSFER data in resource center

116
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.constants.Constants.ALIAS;
import static org.apache.dolphinscheduler.common.constants.Constants.CONTENT;
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_SS;
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
@ -68,7 +67,6 @@ import org.apache.dolphinscheduler.service.storage.StorageEntity;
import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -1499,103 +1497,25 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param loginUser user who create or update resource
* @param fileFullName The full name of resource.Includes path and suffix.
* @param desc description of resource
* @param content content of resource
* @return create result code
*/
@Override
@Transactional
public Result<Object> onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc,
String content) {
// TODO: need update to third party service
if (checkResourceExists(fileFullName)) {
Resource resource = resourcesMapper.queryResource(fileFullName, ResourceType.FILE.ordinal()).get(0);
Result<Object> result = this.updateResourceContent(loginUser, fileFullName,
resource.getUserName(), content);
if (result.getCode() == Status.SUCCESS.getCode()) {
resource.setDescription(desc);
Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<Object, Object> entry : new BeanMap(resource).entrySet()) {
if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
}
}
result.setData(resultMap);
}
return result;
} else {
String resourceSuffix = fileFullName.substring(fileFullName.indexOf(PERIOD) + 1);
String fileNameWithSuffix = fileFullName.substring(fileFullName.lastIndexOf(FOLDER_SEPARATOR) + 1);
String resourceDir = fileFullName.replace(fileNameWithSuffix, EMPTY_STRING);
String resourceName = fileNameWithSuffix.replace(PERIOD + resourceSuffix, EMPTY_STRING);
String[] dirNames = resourceDir.split(FOLDER_SEPARATOR);
int pid = -1;
StringBuilder currDirPath = new StringBuilder();
for (String dirName : dirNames) {
if (StringUtils.isNotEmpty(dirName)) {
pid = queryOrCreateDirId(loginUser, pid, currDirPath.toString(), dirName);
currDirPath.append(FOLDER_SEPARATOR).append(dirName);
}
}
return this.onlineCreateResource(
loginUser, ResourceType.FILE, resourceName, resourceSuffix, desc, content,
currDirPath.toString());
}
}
@Override
@Transactional
public Integer createOrUpdateResource(String userName, String fullName, String description,
String resourceContent) {
public StorageEntity createOrUpdateResource(String userName, String filepath,
String resourceContent) throws Exception {
User user = userMapper.queryByUserNameAccurately(userName);
int suffixLabelIndex = fullName.indexOf(PERIOD);
int suffixLabelIndex = filepath.indexOf(PERIOD);
if (suffixLabelIndex == -1) {
String msg = String.format("The suffix of file can not be empty, fullName:%s.", fullName);
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
if (!fullName.startsWith(FOLDER_SEPARATOR)) {
fullName = FOLDER_SEPARATOR + fullName;
}
Result<Object> createResult = onlineCreateOrUpdateResourceWithDir(
user, fullName, description, resourceContent);
if (createResult.getCode() == Status.SUCCESS.getCode()) {
Map<String, Object> resultMap = (Map<String, Object>) createResult.getData();
return (int) resultMap.get("id");
}
String msg = String.format("Create or update resource error, resourceName:%s.", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
throw new IllegalArgumentException(String
.format("Not allow create or update resources without extension name, filepath: %s", filepath));
}
private int queryOrCreateDirId(User user, int pid, String currentDir, String dirName) {
String dirFullName = currentDir + FOLDER_SEPARATOR + dirName;
if (checkResourceExists(dirFullName)) {
List<Resource> resourceList = resourcesMapper.queryResource(dirFullName, ResourceType.FILE.ordinal());
return resourceList.get(0).getId();
} else {
// create dir
Result<Object> createDirResult = this.createDirectory(
user, dirName, EMPTY_STRING, ResourceType.FILE, pid, currentDir);
if (createDirResult.getCode() == Status.SUCCESS.getCode()) {
// Map<String, Object> resultMap = (Map<String, Object>) createDirResult.getData();
// return resultMap.get("id") == null ? -1 : (Integer) resultMap.get("id");
// Since resource is kept in third party services, its id will always be -1.
return -1;
String defaultPath = storageOperate.getResDir(user.getTenantCode());
String fullName = defaultPath + filepath;
} else {
String msg = String.format("Create dir error, dirFullName:%s.", dirFullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
Result<Object> result = uploadContentToStorage(user, fullName, user.getTenantCode(), resourceContent);
if (result.getCode() != Status.SUCCESS.getCode()) {
throw new ServiceException(result.getMsg());
}
return storageOperate.getFileStatus(fullName, defaultPath, user.getTenantCode(), ResourceType.FILE);
}
private void permissionPostHandle(ResourceType resourceType, User loginUser, Integer resourceId) {
@ -1864,17 +1784,13 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
}
@Override
public Resource queryResourcesFileInfo(String userName, String fileName) {
public StorageEntity queryFileStatus(String userName, String fileName) throws Exception {
// TODO: It is used in PythonGateway, should be revised
User user = userMapper.queryByUserNameAccurately(userName);
Result<Object> resourceResponse = this.queryResourceByFileName(user, fileName, ResourceType.FILE, "");
if (resourceResponse.getCode() != Status.SUCCESS.getCode()) {
String msg =
String.format("Query resource by fullName failed, userName:%s, fullName:%s", userName, fileName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
return (Resource) resourceResponse.getData();
String defaultPath = storageOperate.getResDir(user.getTenantCode());
return storageOperate.getFileStatus(defaultPath + fileName, defaultPath, user.getTenantCode(),
ResourceType.FILE);
}
@Override

38
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java

@ -21,12 +21,12 @@ import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.storage.StorageEntity;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.Date;
@ -103,36 +103,30 @@ public class PythonGatewayTest {
String resourceDir = "/dir1/dir2/";
String resourceName = "test";
String resourceSuffix = "py";
String desc = "desc";
String content = "content";
String resourceFullName = resourceDir + resourceName + "." + resourceSuffix;
int resourceId = 3;
Mockito.when(resourcesService.createOrUpdateResource(user.getUserName(), resourceFullName, desc, content))
.thenReturn(resourceId);
int id = pythonGateway.createOrUpdateResource(
user.getUserName(), resourceFullName, desc, content);
Assertions.assertEquals(id, resourceId);
Assertions.assertDoesNotThrow(
() -> pythonGateway.createOrUpdateResource(user.getUserName(), resourceFullName, content));
}
@Test
public void testQueryResourcesFileInfo() {
public void testQueryResourcesFileInfo() throws Exception {
User user = getTestUser();
Resource resource = getTestResource();
Mockito.when(resourcesService.queryResourcesFileInfo(user.getUserName(), resource.getFullName()))
.thenReturn(resource);
Resource result = pythonGateway.queryResourcesFileInfo(user.getUserName(), resource.getFullName());
Assertions.assertEquals(result.getId(), resource.getId());
StorageEntity storageEntity = getTestResource();
Mockito.when(resourcesService.queryFileStatus(user.getUserName(), storageEntity.getFullName()))
.thenReturn(storageEntity);
StorageEntity result = pythonGateway.queryResourcesFileInfo(user.getUserName(), storageEntity.getFullName());
Assertions.assertEquals(result.getId(), storageEntity.getId());
}
private Resource getTestResource() {
Resource resource = new Resource();
resource.setId(1);
resource.setType(ResourceType.FILE);
resource.setFullName("/dev/test.py");
return resource;
private StorageEntity getTestResource() {
StorageEntity storageEntity = new StorageEntity();
storageEntity.setId(1);
storageEntity.setType(ResourceType.FILE);
storageEntity.setFullName("/dev/test.py");
return storageEntity;
}
private User getTestUser() {

74
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -126,6 +126,8 @@ public class ResourcesServiceTest {
private MockedStatic<PropertyUtils> mockedStaticPropertyUtils;
private Throwable exception;
@BeforeEach
public void setUp() {
mockedStaticFileUtils = Mockito.mockStatic(FileUtils.class);
@ -570,66 +572,28 @@ public class ResourcesServiceTest {
}
@Test
public void testOnlineCreateResourceWithDir() {
public void testCreateOrUpdateResource() throws Exception {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
User user = getUser();
user.setId(1);
Mockito.when(userMapper.queryByUserNameAccurately(user.getUserName())).thenReturn(getUser());
String dir1Path = "/dir1";
String dir2Path = "/dir2";
String resourceDir = dir1Path + dir2Path;
String resourceName = "test";
String resourceSuffix = "py";
String desc = "desc";
String content = "content";
String fullName = resourceDir + "/" + resourceName + "." + resourceSuffix;
Resource dir1 = new Resource();
dir1.setFullName(dir1Path);
dir1.setId(1);
dir1.setUserId(user.getId());
Resource dir2 = new Resource();
dir2.setFullName(resourceDir);
dir2.setUserId(user.getId());
Mockito.when(storageOperate.getDir(ResourceType.FILE, "123")).thenReturn("/dolphinscheduler/123/resources/");
Mockito.when(storageOperate.getResDir("123")).thenReturn("/dolphinscheduler/123/resources/");
// RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
exception = Assertions.assertThrows(IllegalArgumentException.class,
() -> resourcesService.createOrUpdateResource(user.getUserName(), "filename", "my-content"));
Assertions.assertTrue(
exception.getMessage().contains("Not allow create or update resources without extension name"));
// SUCCESS
Mockito.when(storageOperate.getResDir(user.getTenantCode())).thenReturn("/dolphinscheduler/123/resources/");
Mockito.when(FileUtils.getUploadFilename(Mockito.anyString(), Mockito.anyString())).thenReturn("test");
Mockito.when(FileUtils.writeContent2File(Mockito.anyString(), Mockito.anyString())).thenReturn(true);
try {
Mockito.when(storageOperate.mkdir("123", "/dolphinscheduler/123/resources" + dir1Path)).thenReturn(true);
Mockito.when(storageOperate.mkdir("123", "/dolphinscheduler/123/resources" + dir2Path)).thenReturn(true);
} catch (IOException e) {
logger.error("create resource directory {} failed", fullName);
Mockito.when(storageOperate.getFileStatus(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.any())).thenReturn(getStorageEntityResource());
StorageEntity storageEntity =
resourcesService.createOrUpdateResource(user.getUserName(), "filename.txt", "my-content");
Assertions.assertNotNull(storageEntity);
Assertions.assertEquals("/dolphinscheduler/123/resources/ResourcesServiceTest", storageEntity.getFullName());
}
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(user.getTenantId())).thenReturn(getTenant());
Result<Object> result = resourcesService.onlineCreateOrUpdateResourceWithDir(user, fullName, desc, content);
Assertions.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
// TODO: revise this testcase after modifying PythonGateway.java
// @Test
// public void testQueryResourcesFileInfo() {
// User user = getUser();
// String userName = "test-user";
// Mockito.when(userMapper.queryByUserNameAccurately(userName)).thenReturn(user);
// Resource file = new Resource();
// file.setFullName("/dir/file1.py");
// file.setId(1);
// Mockito.when(resourcesMapper.queryResource(file.getFullName(), ResourceType.FILE.ordinal()))
// .thenReturn(Collections.singletonList(file));
// Mockito.when(resourcePermissionCheckService.operationPermissionCheck(
// AuthorizationType.RESOURCE_FILE_ID, null, user.getId(), ApiFuncIdentificationConstant.FILE_VIEW,
// serviceLogger)).thenReturn(true);
// Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(
// AuthorizationType.RESOURCE_FILE_ID, new Object[]{file.getId()}, user.getId(), serviceLogger))
// .thenReturn(true);
// Mockito.when(userMapper.selectById(1)).thenReturn(getUser());
// Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
// Resource result = resourcesService.queryResourcesFileInfo(userName, file.getFullName());
// Assertions.assertEquals(file.getFullName(), result.getFullName());
// }
@Test
public void testUpdateResourceContent() {

10
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UserMapper.xml

@ -33,9 +33,13 @@
</select>
<select id="queryByUserNameAccurately" resultType="org.apache.dolphinscheduler.dao.entity.User">
select
<include refid="baseSql"/>
from t_ds_user
where user_name=#{userName}
<include refid="baseSqlV2">
<property name="alias" value="u"/>
</include>
, t.tenant_code
from t_ds_user u
left join t_ds_tenant t on u.tenant_id = t.id
where u.user_name = #{userName}
</select>
<select id="queryUserByNamePassword" resultType="org.apache.dolphinscheduler.dao.entity.User">
select

Loading…
Cancel
Save