diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java index cc09b2d650..2af8ccd767 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java @@ -16,6 +16,11 @@ */ package org.apache.dolphinscheduler.api.controller; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.ResourcesService; @@ -26,11 +31,6 @@ import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.User; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; -import io.swagger.annotations.ApiOperation; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -63,23 +63,15 @@ public class ResourcesController extends BaseController { private UdfFuncService udfFuncService; /** - * create resource + * create directory * - * @param loginUser login user - * @param alias alias - * @param description description - * @param type type - * @return create result code - */ - - /** * @param loginUser login user * @param type type * @param alias alias * @param description description * @param pid parent id * @param currentDir current directory - * @return + * @return create result code */ @ApiOperation(value = "createDirctory", notes = "CREATE_RESOURCE_NOTES") @ApiImplicitParams({ @@ -140,6 +132,7 @@ public class ResourcesController extends BaseController { * @param resourceId resource id * @param type resource type * @param description description + * @param file resource file * @return update result code */ @ApiOperation(value = "updateResource", notes = "UPDATE_RESOURCE_NOTES") @@ -147,7 +140,8 @@ public class ResourcesController extends BaseController { @ApiImplicitParam(name = "id", value = "RESOURCE_ID", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType = "ResourceType"), @ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType = "String"), - @ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType = "String") + @ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType = "String"), + @ApiImplicitParam(name = "file", value = "RESOURCE_FILE", required = true, dataType = "MultipartFile") }) @PostMapping(value = "/update") @ApiException(UPDATE_RESOURCE_ERROR) @@ -155,10 +149,11 @@ public class ResourcesController extends BaseController { @RequestParam(value = "id") int resourceId, @RequestParam(value = "type") ResourceType type, @RequestParam(value = "name") String alias, - @RequestParam(value = "description", required = false) String description) { - logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}", - loginUser.getUserName(), type, alias, description); - return resourceService.updateResource(loginUser, resourceId, alias, description, type); + @RequestParam(value = "description", required = false) String description, + @RequestParam(value = "file" ,required = false) MultipartFile file) { + logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}, file: {}", + loginUser.getUserName(), type, alias, description, file); + return resourceService.updateResource(loginUser, resourceId, alias, description, type, file); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index 15b106f63e..e0ee7117c4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -287,6 +287,7 @@ public class ResourcesService extends BaseService { * @param name name * @param desc description * @param type resource type + * @param file resource file * @return update result code */ @Transactional(rollbackFor = Exception.class) @@ -294,7 +295,8 @@ public class ResourcesService extends BaseService { int resourceId, String name, String desc, - ResourceType type) { + ResourceType type, + MultipartFile file) { Result result = new Result(); // if resource upload startup @@ -330,6 +332,42 @@ public class ResourcesService extends BaseService { return result; } + if (file != null) { + + // file is empty + if (file.isEmpty()) { + logger.error("file is empty: {}", file.getOriginalFilename()); + putMsg(result, Status.RESOURCE_FILE_IS_EMPTY); + return result; + } + + // file suffix + String fileSuffix = FileUtils.suffix(file.getOriginalFilename()); + String nameSuffix = FileUtils.suffix(name); + + // determine file suffix + if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) { + /** + * rename file suffix and original suffix must be consistent + */ + logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename()); + putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE); + return result; + } + + //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar + if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) { + logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg()); + putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR); + return result; + } + if (file.getSize() > Constants.MAX_FILE_SIZE) { + logger.error("file size is too large: {}", file.getOriginalFilename()); + putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT); + return result; + } + } + // query tenant by user id String tenantCode = getTenantCode(resource.getUserId(),result); if (StringUtils.isEmpty(tenantCode)){ @@ -379,26 +417,33 @@ public class ResourcesService extends BaseService { } // updateResource data - List childrenResource = listAllChildren(resource,false); + Date now = new Date(); resource.setAlias(name); resource.setFullName(fullName); resource.setDescription(desc); resource.setUpdateTime(now); + if (file != null) { + resource.setFileName(file.getOriginalFilename()); + resource.setSize(file.getSize()); + } try { resourcesMapper.updateById(resource); - if (resource.isDirectory() && CollectionUtils.isNotEmpty(childrenResource)) { - String matcherFullName = Matcher.quoteReplacement(fullName); - List childResourceList = new ArrayList<>(); - List resourceList = resourcesMapper.listResourceByIds(childrenResource.toArray(new Integer[childrenResource.size()])); - childResourceList = resourceList.stream().map(t -> { - t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName)); - t.setUpdateTime(now); - return t; - }).collect(Collectors.toList()); - resourcesMapper.batchUpdateResource(childResourceList); + if (resource.isDirectory()) { + List childrenResource = listAllChildren(resource,false); + if (CollectionUtils.isNotEmpty(childrenResource)) { + String matcherFullName = Matcher.quoteReplacement(fullName); + List childResourceList = new ArrayList<>(); + List resourceList = resourcesMapper.listResourceByIds(childrenResource.toArray(new Integer[childrenResource.size()])); + childResourceList = resourceList.stream().map(t -> { + t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName)); + t.setUpdateTime(now); + return t; + }).collect(Collectors.toList()); + resourcesMapper.batchUpdateResource(childResourceList); + } } putMsg(result, Status.SUCCESS); @@ -414,11 +459,31 @@ public class ResourcesService extends BaseService { logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e); throw new ServiceException(Status.UPDATE_RESOURCE_ERROR); } + // if name unchanged, return directly without moving on HDFS - if (originResourceName.equals(name)) { + if (originResourceName.equals(name) && file == null) { return result; } + if (file != null) { + // fail upload + if (!upload(loginUser, fullName, file, type)) { + logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename()); + putMsg(result, Status.HDFS_OPERATION_ERROR); + throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename())); + } + if (!fullName.equals(originFullName)) { + try { + HadoopUtils.getInstance().delete(originHdfsFileName,false); + } catch (IOException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException(String.format("delete resource: %s failed.", originFullName)); + } + } + return result; + } + + // get the path of dest file in hdfs String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index 407f6b587f..1e3c95d775 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.api.service; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; @@ -159,19 +158,19 @@ public class ResourcesServiceTest { PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); User user = new User(); //HDFS_NOT_STARTUP - Result result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE); + Result result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg()); //RESOURCE_NOT_EXIST Mockito.when(resourcesMapper.selectById(1)).thenReturn(getResource()); PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); - result = resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg()); //USER_NO_OPERATION_PERM - result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(),result.getMsg()); @@ -186,7 +185,7 @@ public class ResourcesServiceTest { } catch (IOException e) { logger.error(e.getMessage(),e); } - result = resourcesService.updateResource(user, 1, "ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF); + result = resourcesService.updateResource(user, 1, "ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF,null); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg()); //SUCCESS @@ -199,25 +198,25 @@ public class ResourcesServiceTest { logger.error(e.getMessage(),e); } - result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); //RESOURCE_EXIST Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList()); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg()); //USER_NOT_EXIST Mockito.when(userMapper.selectById(Mockito.anyInt())).thenReturn(null); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null); logger.info(result.toString()); Assert.assertTrue(Status.USER_NOT_EXIST.getCode() == result.getCode()); //TENANT_NOT_EXIST Mockito.when(userMapper.selectById(1)).thenReturn(getUser()); Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null); logger.info(result.toString()); Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(),result.getMsg()); @@ -231,7 +230,7 @@ public class ResourcesServiceTest { logger.error(e.getMessage(),e); } - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF,null); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java index c460354fe5..287f7267bc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java @@ -42,22 +42,4 @@ public class ResourceInfo { public void setRes(String res) { this.res = res; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ResourceInfo that = (ResourceInfo) o; - - if (id != that.id) return false; - return res.equals(that.res); - } - - @Override - public int hashCode() { - int result = id; - result = 31 * result + res.hashCode(); - return result; - } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java index 4e58201bf3..32a2a6b05d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java @@ -216,7 +216,7 @@ public class SparkParameters extends AbstractParameters { @Override public boolean checkParameters() { - return mainJar != null && programType != null && sparkVersion != null; + return mainJar != null && programType != null; } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index c0bc3aaea8..6692598ce6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -663,37 +663,58 @@ public class MasterExecThread implements Runnable { if(startNodes.contains(taskName)){ return DependResult.SUCCESS; } - TaskNode taskNode = dag.getNode(taskName); List depNameList = taskNode.getDepList(); for(String depsNode : depNameList ){ if(!dag.containsNode(depsNode) - || forbiddenTaskList.containsKey(depsNode) - || skipTaskNodeList.containsKey(depsNode)){ + || forbiddenTaskList.containsKey(depsNode)){ continue; } - // dependencies must be fully completed + if(skipTaskNodeList.containsKey(depsNode)){ + return DependResult.FAILED; + } + // all the dependencies must be completed if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING; } ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); - // conditions task would not return failed. - if(depTaskState.typeIsFailure() - && !DagHelper.haveConditionsAfterNode(depsNode, dag ) - && !dag.getNode(depsNode).isConditionsTask()){ - return DependResult.FAILED; - } - if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ return DependResult.WAITING; } + // ignore task state if current task is condition + if(taskNode.isConditionsTask()){ + continue; + } + if(!dependTaskSuccess(depsNode, taskName)){ + return DependResult.FAILED; + } } - logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); - return DependResult.SUCCESS; } + /** + * depend node is completed, but here need check the condition task branch is the next node + * @param dependNodeName + * @param nextNodeName + * @return + */ + private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){ + if(dag.getNode(dependNodeName).isConditionsTask()){ + //condition task need check the branch to run + List nextTaskList = parseConditionTask(dependNodeName); + if(!nextTaskList.contains(nextNodeName)){ + return false; + } + }else { + ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState(); + if(depTaskState.typeIsFailure()){ + return false; + } + } + return true; + } + /** * query task instance by complete state diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java index 3a8c8fe7d6..1bc79ab94e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java @@ -70,14 +70,16 @@ public class UDFUtils { */ private static void buildJarSql(List sqls, Map udfFuncTenantCodeMap) { String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS); - + String resourceFullName; Set> entries = udfFuncTenantCodeMap.entrySet(); for (Map.Entry entry:entries){ String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue()); if (!uploadPath.startsWith("hdfs:")) { uploadPath = defaultFS + uploadPath; } - sqls.add(String.format("add jar %s%s", uploadPath, entry.getKey().getResourceName())); + resourceFullName = entry.getKey().getResourceName(); + resourceFullName = resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s",resourceFullName); + sqls.add(String.format("add jar %s%s", uploadPath, resourceFullName)); } } diff --git a/sql/soft_version b/sql/soft_version index 6261a05bb0..d5e98f723f 100644 --- a/sql/soft_version +++ b/sql/soft_version @@ -1 +1 @@ -1.3.1 \ No newline at end of file +1.3.2 \ No newline at end of file