Browse Source

Merge branch '1.3.2-release' of https://github.com/apache/incubator-dolphinscheduler into 1.3.2-release

pull/3/MERGE
break60 4 years ago
parent
commit
e03b95b835
  1. 35
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
  2. 73
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  3. 19
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  4. 18
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
  5. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
  6. 47
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  7. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
  8. 2
      sql/soft_version

35
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java

@ -16,6 +16,11 @@
*/
package org.apache.dolphinscheduler.api.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ResourcesService;
@ -26,11 +31,6 @@ import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -63,23 +63,15 @@ public class ResourcesController extends BaseController {
private UdfFuncService udfFuncService;
/**
* create resource
* create directory
*
* @param loginUser login user
* @param alias alias
* @param description description
* @param type type
* @return create result code
*/
/**
* @param loginUser login user
* @param type type
* @param alias alias
* @param description description
* @param pid parent id
* @param currentDir current directory
* @return
* @return create result code
*/
@ApiOperation(value = "createDirctory", notes = "CREATE_RESOURCE_NOTES")
@ApiImplicitParams({
@ -140,6 +132,7 @@ public class ResourcesController extends BaseController {
* @param resourceId resource id
* @param type resource type
* @param description description
* @param file resource file
* @return update result code
*/
@ApiOperation(value = "updateResource", notes = "UPDATE_RESOURCE_NOTES")
@ -147,7 +140,8 @@ public class ResourcesController extends BaseController {
@ApiImplicitParam(name = "id", value = "RESOURCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType = "ResourceType"),
@ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType = "String")
@ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType = "String"),
@ApiImplicitParam(name = "file", value = "RESOURCE_FILE", required = true, dataType = "MultipartFile")
})
@PostMapping(value = "/update")
@ApiException(UPDATE_RESOURCE_ERROR)
@ -155,10 +149,11 @@ public class ResourcesController extends BaseController {
@RequestParam(value = "id") int resourceId,
@RequestParam(value = "type") ResourceType type,
@RequestParam(value = "name") String alias,
@RequestParam(value = "description", required = false) String description) {
logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}",
loginUser.getUserName(), type, alias, description);
return resourceService.updateResource(loginUser, resourceId, alias, description, type);
@RequestParam(value = "description", required = false) String description,
@RequestParam(value = "file" ,required = false) MultipartFile file) {
logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}, file: {}",
loginUser.getUserName(), type, alias, description, file);
return resourceService.updateResource(loginUser, resourceId, alias, description, type, file);
}
/**

73
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -287,6 +287,7 @@ public class ResourcesService extends BaseService {
* @param name name
* @param desc description
* @param type resource type
* @param file resource file
* @return update result code
*/
@Transactional(rollbackFor = Exception.class)
@ -294,7 +295,8 @@ public class ResourcesService extends BaseService {
int resourceId,
String name,
String desc,
ResourceType type) {
ResourceType type,
MultipartFile file) {
Result result = new Result();
// if resource upload startup
@ -330,6 +332,42 @@ public class ResourcesService extends BaseService {
return result;
}
if (file != null) {
// file is empty
if (file.isEmpty()) {
logger.error("file is empty: {}", file.getOriginalFilename());
putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
return result;
}
// file suffix
String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
String nameSuffix = FileUtils.suffix(name);
// determine file suffix
if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
/**
* rename file suffix and original suffix must be consistent
*/
logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename());
putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
return result;
}
//If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) {
logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
return result;
}
if (file.getSize() > Constants.MAX_FILE_SIZE) {
logger.error("file size is too large: {}", file.getOriginalFilename());
putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
return result;
}
}
// query tenant by user id
String tenantCode = getTenantCode(resource.getUserId(),result);
if (StringUtils.isEmpty(tenantCode)){
@ -379,17 +417,23 @@ public class ResourcesService extends BaseService {
}
// updateResource data
List<Integer> childrenResource = listAllChildren(resource,false);
Date now = new Date();
resource.setAlias(name);
resource.setFullName(fullName);
resource.setDescription(desc);
resource.setUpdateTime(now);
if (file != null) {
resource.setFileName(file.getOriginalFilename());
resource.setSize(file.getSize());
}
try {
resourcesMapper.updateById(resource);
if (resource.isDirectory() && CollectionUtils.isNotEmpty(childrenResource)) {
if (resource.isDirectory()) {
List<Integer> childrenResource = listAllChildren(resource,false);
if (CollectionUtils.isNotEmpty(childrenResource)) {
String matcherFullName = Matcher.quoteReplacement(fullName);
List<Resource> childResourceList = new ArrayList<>();
List<Resource> resourceList = resourcesMapper.listResourceByIds(childrenResource.toArray(new Integer[childrenResource.size()]));
@ -400,6 +444,7 @@ public class ResourcesService extends BaseService {
}).collect(Collectors.toList());
resourcesMapper.batchUpdateResource(childResourceList);
}
}
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
@ -414,11 +459,31 @@ public class ResourcesService extends BaseService {
logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e);
throw new ServiceException(Status.UPDATE_RESOURCE_ERROR);
}
// if name unchanged, return directly without moving on HDFS
if (originResourceName.equals(name)) {
if (originResourceName.equals(name) && file == null) {
return result;
}
if (file != null) {
// fail upload
if (!upload(loginUser, fullName, file, type)) {
logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
putMsg(result, Status.HDFS_OPERATION_ERROR);
throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
}
if (!fullName.equals(originFullName)) {
try {
HadoopUtils.getInstance().delete(originHdfsFileName,false);
} catch (IOException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(String.format("delete resource: %s failed.", originFullName));
}
}
return result;
}
// get the path of dest file in hdfs
String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName);

19
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
@ -159,19 +158,19 @@ public class ResourcesServiceTest {
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
User user = new User();
//HDFS_NOT_STARTUP
Result result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE);
Result result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
//RESOURCE_NOT_EXIST
Mockito.when(resourcesMapper.selectById(1)).thenReturn(getResource());
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
result = resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE);
result = resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
//USER_NO_OPERATION_PERM
result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(),result.getMsg());
@ -186,7 +185,7 @@ public class ResourcesServiceTest {
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
result = resourcesService.updateResource(user, 1, "ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF);
result = resourcesService.updateResource(user, 1, "ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF,null);
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
//SUCCESS
@ -199,25 +198,25 @@ public class ResourcesServiceTest {
logger.error(e.getMessage(),e);
}
result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
//RESOURCE_EXIST
Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList());
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
//USER_NOT_EXIST
Mockito.when(userMapper.selectById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null);
logger.info(result.toString());
Assert.assertTrue(Status.USER_NOT_EXIST.getCode() == result.getCode());
//TENANT_NOT_EXIST
Mockito.when(userMapper.selectById(1)).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(),result.getMsg());
@ -231,7 +230,7 @@ public class ResourcesServiceTest {
logger.error(e.getMessage(),e);
}
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF,null);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());

18
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java

@ -42,22 +42,4 @@ public class ResourceInfo {
public void setRes(String res) {
this.res = res;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ResourceInfo that = (ResourceInfo) o;
if (id != that.id) return false;
return res.equals(that.res);
}
@Override
public int hashCode() {
int result = id;
result = 31 * result + res.hashCode();
return result;
}
}

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java

@ -216,7 +216,7 @@ public class SparkParameters extends AbstractParameters {
@Override
public boolean checkParameters() {
return mainJar != null && programType != null && sparkVersion != null;
return mainJar != null && programType != null;
}
@Override

47
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -663,37 +663,58 @@ public class MasterExecThread implements Runnable {
if(startNodes.contains(taskName)){
return DependResult.SUCCESS;
}
TaskNode taskNode = dag.getNode(taskName);
List<String> depNameList = taskNode.getDepList();
for(String depsNode : depNameList ){
if(!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode)
|| skipTaskNodeList.containsKey(depsNode)){
|| forbiddenTaskList.containsKey(depsNode)){
continue;
}
// dependencies must be fully completed
if(skipTaskNodeList.containsKey(depsNode)){
return DependResult.FAILED;
}
// all the dependencies must be completed
if(!completeTaskList.containsKey(depsNode)){
return DependResult.WAITING;
}
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
// conditions task would not return failed.
if(depTaskState.typeIsFailure()
&& !DagHelper.haveConditionsAfterNode(depsNode, dag )
&& !dag.getNode(depsNode).isConditionsTask()){
return DependResult.FAILED;
}
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){
return DependResult.WAITING;
}
// ignore task state if current task is condition
if(taskNode.isConditionsTask()){
continue;
}
if(!dependTaskSuccess(depsNode, taskName)){
return DependResult.FAILED;
}
}
logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray()));
return DependResult.SUCCESS;
}
/**
* depend node is completed, but here need check the condition task branch is the next node
* @param dependNodeName
* @param nextNodeName
* @return
*/
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){
if(dag.getNode(dependNodeName).isConditionsTask()){
//condition task need check the branch to run
List<String> nextTaskList = parseConditionTask(dependNodeName);
if(!nextTaskList.contains(nextNodeName)){
return false;
}
}else {
ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState();
if(depTaskState.typeIsFailure()){
return false;
}
}
return true;
}
/**
* query task instance by complete state

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java

@ -70,14 +70,16 @@ public class UDFUtils {
*/
private static void buildJarSql(List<String> sqls, Map<UdfFunc,String> udfFuncTenantCodeMap) {
String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS);
String resourceFullName;
Set<Map.Entry<UdfFunc,String>> entries = udfFuncTenantCodeMap.entrySet();
for (Map.Entry<UdfFunc,String> entry:entries){
String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue());
if (!uploadPath.startsWith("hdfs:")) {
uploadPath = defaultFS + uploadPath;
}
sqls.add(String.format("add jar %s%s", uploadPath, entry.getKey().getResourceName()));
resourceFullName = entry.getKey().getResourceName();
resourceFullName = resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s",resourceFullName);
sqls.add(String.format("add jar %s%s", uploadPath, resourceFullName));
}
}

2
sql/soft_version

@ -1 +1 @@
1.3.1
1.3.2
Loading…
Cancel
Save