From 403a4a20f2a6e12f1448c0950ae58c37c3213806 Mon Sep 17 00:00:00 2001 From: baoliang Date: Fri, 6 Nov 2020 11:44:34 +0800 Subject: [PATCH] merge from 1.3.3-release --- dolphinscheduler-alert/pom.xml | 2 +- dolphinscheduler-api/pom.xml | 2 +- .../api/controller/AccessTokenController.java | 6 +- .../api/controller/ResourcesController.java | 50 ++--- .../api/controller/UsersController.java | 2 +- .../api/service/AccessTokenService.java | 7 +- .../api/service/ResourcesService.java | 181 ++++++++++++++--- .../api/service/UdfFuncService.java | 20 +- .../api/service/UsersService.java | 51 ++--- .../impl/ProcessDefinitionServiceImpl.java | 2 +- .../api/service/impl/TenantServiceImpl.java | 1 + .../api/service/AccessTokenServiceTest.java | 16 +- .../api/service/ResourcesServiceTest.java | 22 +- .../api/service/UsersServiceTest.java | 10 +- dolphinscheduler-common/pom.xml | 2 +- dolphinscheduler-dao/pom.xml | 2 +- .../dao/mapper/UdfFuncMapper.java | 7 + .../dolphinscheduler/dao/utils/DagHelper.java | 177 +++++++++++----- .../dao/mapper/UdfFuncMapper.xml | 13 ++ dolphinscheduler-dist/pom.xml | 2 +- dolphinscheduler-microbench/pom.xml | 2 +- dolphinscheduler-plugin-api/pom.xml | 2 +- dolphinscheduler-remote/pom.xml | 2 +- .../remote/command/CommandType.java | 10 + .../remote/command/DBTaskAckCommand.java | 4 +- .../remote/command/DBTaskResponseCommand.java | 4 +- dolphinscheduler-server/pom.xml | 2 +- .../runner/ConditionsTaskExecThread.java | 159 +++++++++++++++ .../runner/MasterBaseTaskExecThread.java | 103 +++++++++- .../master/runner/MasterExecThread.java | 191 ++++++------------ .../worker/task/AbstractCommandExecutor.java | 3 + dolphinscheduler-service/pom.xml | 2 +- .../service/process/ProcessService.java | 59 +++--- dolphinscheduler-ui/pom.xml | 2 +- pom.xml | 2 +- 35 files changed, 769 insertions(+), 353 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java diff --git a/dolphinscheduler-alert/pom.xml b/dolphinscheduler-alert/pom.xml index 215916ddf7..a44d101ffe 100644 --- a/dolphinscheduler-alert/pom.xml +++ b/dolphinscheduler-alert/pom.xml @@ -21,7 +21,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT dolphinscheduler-alert ${project.artifactId} diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml index cd6e04fc26..2af0ed2ffa 100644 --- a/dolphinscheduler-api/pom.xml +++ b/dolphinscheduler-api/pom.xml @@ -21,7 +21,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT dolphinscheduler-api ${project.artifactId} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AccessTokenController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AccessTokenController.java index 2457177cdf..17faad04bc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AccessTokenController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AccessTokenController.java @@ -86,7 +86,7 @@ public class AccessTokenController extends BaseController { logger.info("login user {}, create token , userId : {} , token expire time : {} , token : {}", loginUser.getUserName(), userId, expireTime, token); - Map result = accessTokenService.createToken(userId, expireTime, token); + Map result = accessTokenService.createToken(loginUser, userId, expireTime, token); return returnDataList(result); } @@ -106,7 +106,7 @@ public class AccessTokenController extends BaseController { @RequestParam(value = "userId") int userId, @RequestParam(value = "expireTime") String expireTime) { logger.info("login user {}, generate token , userId : {} , token expire time : {}", loginUser, userId, expireTime); - Map result = accessTokenService.generateToken(userId, expireTime); + Map result = accessTokenService.generateToken(loginUser, userId, expireTime); return returnDataList(result); } @@ -185,7 +185,7 @@ public class AccessTokenController extends BaseController { logger.info("login user {}, update token , userId : {} , token expire time : {} , token : {}", loginUser.getUserName(), userId, expireTime, token); - Map result = accessTokenService.updateToken(id, userId, expireTime, token); + Map result = accessTokenService.updateToken(loginUser, id, userId, expireTime, token); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java index a0ec666ed7..3f23493961 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.service.ResourcesService; import org.apache.dolphinscheduler.api.service.UdfFuncService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.utils.ParameterUtils; @@ -65,21 +66,13 @@ public class ResourcesController extends BaseController { /** * create directory * - * @param loginUser login user - * @param alias alias - * @param description description - * @param type type - * @return create result code - */ - - /** * @param loginUser login user * @param type type * @param alias alias * @param description description * @param pid parent id * @param currentDir current directory - * @return + * @return create result code */ @ApiOperation(value = "createDirctory", notes = "CREATE_RESOURCE_NOTES") @ApiImplicitParams({ @@ -140,6 +133,7 @@ public class ResourcesController extends BaseController { * @param resourceId resource id * @param type resource type * @param description description + * @param file resource file * @return update result code */ @ApiOperation(value = "updateResource", notes = "UPDATE_RESOURCE_NOTES") @@ -147,7 +141,8 @@ public class ResourcesController extends BaseController { @ApiImplicitParam(name = "id", value = "RESOURCE_ID", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType = "ResourceType"), @ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType = "String"), - @ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType = "String") + @ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType = "String"), + @ApiImplicitParam(name = "file", value = "RESOURCE_FILE", required = true, dataType = "MultipartFile") }) @PostMapping(value = "/update") @ApiException(UPDATE_RESOURCE_ERROR) @@ -155,10 +150,11 @@ public class ResourcesController extends BaseController { @RequestParam(value = "id") int resourceId, @RequestParam(value = "type") ResourceType type, @RequestParam(value = "name") String alias, - @RequestParam(value = "description", required = false) String description) { - logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}", - loginUser.getUserName(), type, alias, description); - return resourceService.updateResource(loginUser, resourceId, alias, description, type); + @RequestParam(value = "description", required = false) String description, + @RequestParam(value = "file" ,required = false) MultipartFile file) { + logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}, file: {}", + loginUser.getUserName(), type, alias, description, file); + return resourceService.updateResource(loginUser, resourceId, alias, description, type, file); } /** @@ -280,7 +276,7 @@ public class ResourcesController extends BaseController { * @param type resource type * @return resource list */ - @ApiOperation(value = "queryResourceJarList", notes = "QUERY_RESOURCE_LIST_NOTES") + @ApiOperation(value = "queryResourceByProgramType", notes = "QUERY_RESOURCE_LIST_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType = "ResourceType") }) @@ -288,10 +284,14 @@ public class ResourcesController extends BaseController { @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_RESOURCES_LIST_ERROR) public Result queryResourceJarList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "type") ResourceType type + @RequestParam(value = "type") ResourceType type, + @RequestParam(value = "programType",required = false) ProgramType programType ) { - logger.info("query resource list, login user:{}, resource type:{}", loginUser.getUserName(), type.toString()); - Map result = resourceService.queryResourceJarList(loginUser, type); + String programTypeName = programType == null ? "" : programType.name(); + String userName = loginUser.getUserName(); + userName = userName.replaceAll("[\n|\r|\t]", "_"); + logger.info("query resource list, login user:{}, resource type:{}, program type:{}", userName,programTypeName); + Map result = resourceService.queryResourceByProgramType(loginUser, type,programType); return returnDataList(result); } @@ -569,7 +569,7 @@ public class ResourcesController extends BaseController { @GetMapping(value = "/udf-func/list-paging") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_UDF_FUNCTION_LIST_PAGING_ERROR) - public Result queryUdfFuncList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result queryUdfFuncListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("pageNo") Integer pageNo, @RequestParam(value = "searchVal", required = false) String searchVal, @RequestParam("pageSize") Integer pageSize @@ -586,23 +586,25 @@ public class ResourcesController extends BaseController { } /** - * query resource list by type + * query udf func list by type * * @param loginUser login user * @param type resource type * @return resource list */ - @ApiOperation(value = "queryResourceList", notes = "QUERY_RESOURCE_LIST_NOTES") + @ApiOperation(value = "queryUdfFuncList", notes = "QUERY_UDF_FUNC_LIST_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "type", value = "UDF_TYPE", required = true, dataType = "UdfType") }) @GetMapping(value = "/udf-func/list") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_DATASOURCE_BY_TYPE_ERROR) - public Result queryResourceList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result queryUdfFuncList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("type") UdfType type) { - logger.info("query datasource list, user:{}, type:{}", loginUser.getUserName(), type); - Map result = udfFuncService.queryResourceList(loginUser, type.ordinal()); + String userName = loginUser.getUserName(); + userName = userName.replaceAll("[\n|\r|\t]", "_"); + logger.info("query udf func list, user:{}, type:{}", userName, type); + Map result = udfFuncService.queryUdfFuncList(loginUser, type.ordinal()); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java index 8d6f9fc820..9b16265e32 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java @@ -166,7 +166,7 @@ public class UsersController extends BaseController { @RequestParam(value = "state", required = false) int state) throws Exception { logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}, state: {}", loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone, queue, state); - Map result = usersService.updateUser(id, userName, userPassword, email, tenantId, phone, queue, state); + Map result = usersService.updateUser(loginUser, id, userName, userPassword, email, tenantId, phone, queue, state); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java index 98eef47090..b1c320566f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java @@ -44,7 +44,8 @@ public interface AccessTokenService { * @param token token string * @return create result code */ - Map createToken(int userId, String expireTime, String token); + Map createToken(User loginUser, int userId, String expireTime, String token); + /** * generate token @@ -53,7 +54,7 @@ public interface AccessTokenService { * @param expireTime token expire time * @return token string */ - Map generateToken(int userId, String expireTime); + Map generateToken(User loginUser, int userId, String expireTime); /** * delete access token @@ -73,5 +74,5 @@ public interface AccessTokenService { * @param token token string * @return update result code */ - Map updateToken(int id, int userId, String expireTime, String token); + Map updateToken(User loginUser, int id, int userId, String expireTime, String token); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index bd7598979d..dffc55edfe 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; @@ -87,7 +88,7 @@ public class ResourcesService extends BaseService { * @param currentDir current directory * @return create directory result */ - @Transactional(rollbackFor = RuntimeException.class) + @Transactional(rollbackFor = Exception.class) public Result createDirectory(User loginUser, String name, String description, @@ -101,8 +102,11 @@ public class ResourcesService extends BaseService { putMsg(result, Status.HDFS_NOT_STARTUP); return result; } - String fullName = "/".equals(currentDir) ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name); - + String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name); + result = verifyResourceName(fullName,type,loginUser); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } if (pid != -1) { Resource parentResource = resourcesMapper.selectById(pid); @@ -165,7 +169,7 @@ public class ResourcesService extends BaseService { * @param currentDir current directory * @return create result code */ - @Transactional(rollbackFor = RuntimeException.class) + @Transactional(rollbackFor = Exception.class) public Result createResource(User loginUser, String name, String desc, @@ -230,7 +234,7 @@ public class ResourcesService extends BaseService { } // check resoure name exists - String fullName = "/".equals(currentDir) ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name); + String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name); if (checkResourceExists(fullName, 0, type.ordinal())) { logger.error("resource {} has exist, can't recreate", name); putMsg(result, Status.RESOURCE_EXIST); @@ -288,14 +292,16 @@ public class ResourcesService extends BaseService { * @param name name * @param desc description * @param type resource type + * @param file resource file * @return update result code */ - @Transactional(rollbackFor = RuntimeException.class) + @Transactional(rollbackFor = Exception.class) public Result updateResource(User loginUser, int resourceId, String name, String desc, - ResourceType type) { + ResourceType type, + MultipartFile file) { Result result = new Result(); // if resource upload startup @@ -315,7 +321,7 @@ public class ResourcesService extends BaseService { return result; } - if (name.equals(resource.getAlias()) && desc.equals(resource.getDescription())) { + if (file == null && name.equals(resource.getAlias()) && desc.equals(resource.getDescription())) { putMsg(result, Status.SUCCESS); return result; } @@ -331,6 +337,42 @@ public class ResourcesService extends BaseService { return result; } + if (file != null) { + + // file is empty + if (file.isEmpty()) { + logger.error("file is empty: {}", file.getOriginalFilename()); + putMsg(result, Status.RESOURCE_FILE_IS_EMPTY); + return result; + } + + // file suffix + String fileSuffix = FileUtils.suffix(file.getOriginalFilename()); + String nameSuffix = FileUtils.suffix(name); + + // determine file suffix + if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) { + /** + * rename file suffix and original suffix must be consistent + */ + logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename()); + putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE); + return result; + } + + //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar + if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) { + logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg()); + putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR); + return result; + } + if (file.getSize() > Constants.MAX_FILE_SIZE) { + logger.error("file size is too large: {}", file.getOriginalFilename()); + putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT); + return result; + } + } + // query tenant by user id String tenantCode = getTenantCode(resource.getUserId(),result); if (StringUtils.isEmpty(tenantCode)){ @@ -380,31 +422,61 @@ public class ResourcesService extends BaseService { } // updateResource data - List childrenResource = listAllChildren(resource,false); Date now = new Date(); resource.setAlias(name); resource.setFullName(fullName); resource.setDescription(desc); resource.setUpdateTime(now); + if (file != null) { + resource.setFileName(file.getOriginalFilename()); + resource.setSize(file.getSize()); + } try { resourcesMapper.updateById(resource); - if (resource.isDirectory() && CollectionUtils.isNotEmpty(childrenResource)) { - String matcherFullName = Matcher.quoteReplacement(fullName); - List childResourceList = new ArrayList<>(); - List resourceList = resourcesMapper.listResourceByIds(childrenResource.toArray(new Integer[childrenResource.size()])); - childResourceList = resourceList.stream().map(t -> { - t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName)); - t.setUpdateTime(now); - return t; - }).collect(Collectors.toList()); - resourcesMapper.batchUpdateResource(childResourceList); + if (resource.isDirectory()) { + List childrenResource = listAllChildren(resource,false); + if (CollectionUtils.isNotEmpty(childrenResource)) { + String matcherFullName = Matcher.quoteReplacement(fullName); + List childResourceList = new ArrayList<>(); + Integer[] childResIdArray = childrenResource.toArray(new Integer[childrenResource.size()]); + List resourceList = resourcesMapper.listResourceByIds(childResIdArray); + childResourceList = resourceList.stream().map(t -> { + t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName)); + t.setUpdateTime(now); + return t; + }).collect(Collectors.toList()); + resourcesMapper.batchUpdateResource(childResourceList); + + if (ResourceType.UDF.equals(resource.getType())) { + List udfFuncs = udfFunctionMapper.listUdfByResourceId(childResIdArray); + if (CollectionUtils.isNotEmpty(udfFuncs)) { + udfFuncs = udfFuncs.stream().map(t -> { + t.setResourceName(t.getResourceName().replaceFirst(originFullName, matcherFullName)); + t.setUpdateTime(now); + return t; + }).collect(Collectors.toList()); + udfFunctionMapper.batchUpdateUdfFunc(udfFuncs); + } + } + } + } else if (ResourceType.UDF.equals(resource.getType())) { + List udfFuncs = udfFunctionMapper.listUdfByResourceId(new Integer[]{resourceId}); + if (CollectionUtils.isNotEmpty(udfFuncs)) { + udfFuncs = udfFuncs.stream().map(t -> { + t.setResourceName(fullName); + t.setUpdateTime(now); + return t; + }).collect(Collectors.toList()); + udfFunctionMapper.batchUpdateUdfFunc(udfFuncs); + } + } putMsg(result, Status.SUCCESS); Map dataMap = new BeanMap(resource); - Map resultMap = new HashMap<>(); + Map resultMap = new HashMap<>(5); for (Map.Entry entry: dataMap.entrySet()) { if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) { resultMap.put(entry.getKey().toString(), entry.getValue()); @@ -415,11 +487,31 @@ public class ResourcesService extends BaseService { logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e); throw new ServiceException(Status.UPDATE_RESOURCE_ERROR); } + // if name unchanged, return directly without moving on HDFS - if (originResourceName.equals(name)) { + if (originResourceName.equals(name) && file == null) { + return result; + } + + if (file != null) { + // fail upload + if (!upload(loginUser, fullName, file, type)) { + logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename()); + putMsg(result, Status.HDFS_OPERATION_ERROR); + throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename())); + } + if (!fullName.equals(originFullName)) { + try { + HadoopUtils.getInstance().delete(originHdfsFileName,false); + } catch (IOException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException(String.format("delete resource: %s failed.", originFullName)); + } + } return result; } + // get the path of dest file in hdfs String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName); @@ -449,7 +541,7 @@ public class ResourcesService extends BaseService { */ public Map queryResourceListPaging(User loginUser, int direcotryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) { - HashMap result = new HashMap<>(); + HashMap result = new HashMap<>(5); Page page = new Page(pageNo, pageSize); int userId = loginUser.getId(); if (isAdmin(loginUser)) { @@ -550,7 +642,7 @@ public class ResourcesService extends BaseService { */ public Map queryResourceList(User loginUser, ResourceType type) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); int userId = loginUser.getId(); if(isAdmin(loginUser)){ @@ -565,21 +657,33 @@ public class ResourcesService extends BaseService { } /** - * query resource list + * query resource list by program type * * @param loginUser login user * @param type resource type * @return resource list */ - public Map queryResourceJarList(User loginUser, ResourceType type) { + public Map queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); + String suffix = ".jar"; int userId = loginUser.getId(); if(isAdmin(loginUser)){ userId = 0; } + if (programType != null) { + switch (programType) { + case JAVA: + break; + case SCALA: + break; + case PYTHON: + suffix = ".py"; + break; + } + } List allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0); - List resources = new ResourceFilter(".jar",new ArrayList<>(allResourceList)).filter(); + List resources = new ResourceFilter(suffix,new ArrayList<>(allResourceList)).filter(); Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources); result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren()); putMsg(result,Status.SUCCESS); @@ -829,7 +933,7 @@ public class ResourcesService extends BaseService { * @param content content * @return create result code */ - @Transactional(rollbackFor = RuntimeException.class) + @Transactional(rollbackFor = Exception.class) public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory) { Result result = new Result(); // if resource upload startup @@ -852,12 +956,25 @@ public class ResourcesService extends BaseService { } String name = fileName.trim() + "." + nameSuffix; - String fullName = "/".equals(currentDirectory) ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name); + String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name); result = verifyResourceName(fullName,type,loginUser); if (!result.getCode().equals(Status.SUCCESS.getCode())) { return result; } + if (pid != -1) { + Resource parentResource = resourcesMapper.selectById(pid); + + if (parentResource == null) { + putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST); + return result; + } + + if (!hasPerm(loginUser, parentResource.getUserId())) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + } // save data Date now = new Date(); @@ -891,7 +1008,7 @@ public class ResourcesService extends BaseService { * @param content content * @return update result cod */ - @Transactional(rollbackFor = RuntimeException.class) + @Transactional(rollbackFor = Exception.class) public Result updateResourceContent(int resourceId, String content) { Result result = new Result(); @@ -1096,7 +1213,7 @@ public class ResourcesService extends BaseService { * @return unauthorized result code */ public Map unauthorizedUDFFunction(User loginUser, Integer userId) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); //only admin can operate if (checkAdmin(loginUser, result)) { return result; @@ -1148,7 +1265,7 @@ public class ResourcesService extends BaseService { * @return authorized result */ public Map authorizedFile(User loginUser, Integer userId) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); if (checkAdmin(loginUser, result)){ return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java index 04f641f279..cd962fdc70 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java @@ -148,7 +148,7 @@ public class UdfFuncService extends BaseService{ */ public Map queryUdfFuncDetail(int id) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); UdfFunc udfFunc = udfFuncMapper.selectById(id); if (udfFunc == null) { putMsg(result, Status.RESOURCE_NOT_EXIST); @@ -244,7 +244,7 @@ public class UdfFuncService extends BaseService{ * @return udf function list page */ public Map queryUdfFuncListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); PageInfo pageInfo = new PageInfo(pageNo, pageSize); @@ -276,15 +276,19 @@ public class UdfFuncService extends BaseService{ } /** - * query data resource by type + * query udf list * * @param loginUser login user - * @param type resource type - * @return resource list + * @param type udf type + * @return udf func list */ - public Map queryResourceList(User loginUser, Integer type) { - Map result = new HashMap<>(); - List udfFuncList = udfFuncMapper.getUdfFuncByType(loginUser.getId(), type); + public Map queryUdfFuncList(User loginUser, Integer type) { + Map result = new HashMap<>(5); + int userId = loginUser.getId(); + if (isAdmin(loginUser)) { + userId = 0; + } + List udfFuncList = udfFuncMapper.getUdfFuncByType(userId, type); result.put(Constants.DATA_LIST, udfFuncList); putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java index 89038ad09f..e47eb42cb5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java @@ -103,7 +103,7 @@ public class UsersService extends BaseService { String queue, int state) throws Exception { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); //check all user params String msg = this.checkUserParams(userName, userPassword, email, phone); @@ -231,7 +231,7 @@ public class UsersService extends BaseService { * @return user list page */ public Map queryUserList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -253,6 +253,8 @@ public class UsersService extends BaseService { /** * updateProcessInstance user * + * + * @param loginUser * @param userId user id * @param userName user name * @param userPassword user password @@ -263,7 +265,7 @@ public class UsersService extends BaseService { * @return update result code * @throws Exception exception */ - public Map updateUser(int userId, + public Map updateUser(User loginUser, int userId, String userName, String userPassword, String email, @@ -271,16 +273,17 @@ public class UsersService extends BaseService { String phone, String queue, int state) throws Exception { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); result.put(Constants.STATUS, false); + if (check(result, !hasPerm(loginUser, userId), Status.USER_NO_OPERATION_PERM)) { + return result; + } User user = userMapper.selectById(userId); - if (user == null) { putMsg(result, Status.USER_NOT_EXIST, userId); return result; } - if (StringUtils.isNotEmpty(userName)) { if (!CheckUtils.checkUserName(userName)){ @@ -394,7 +397,7 @@ public class UsersService extends BaseService { * @throws Exception exception when operate hdfs */ public Map deleteUserById(User loginUser, int id) throws Exception { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); //only admin can operate if (!isAdmin(loginUser)) { putMsg(result, Status.USER_NO_OPERATION_PERM, id); @@ -434,7 +437,7 @@ public class UsersService extends BaseService { */ @Transactional(rollbackFor = RuntimeException.class) public Map grantProject(User loginUser, int userId, String projectIds) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); result.put(Constants.STATUS, false); //only admin can operate @@ -484,7 +487,7 @@ public class UsersService extends BaseService { */ @Transactional(rollbackFor = RuntimeException.class) public Map grantResources(User loginUser, int userId, String resourceIds) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -581,7 +584,7 @@ public class UsersService extends BaseService { */ @Transactional(rollbackFor = RuntimeException.class) public Map grantUDFFunction(User loginUser, int userId, String udfIds) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { @@ -628,7 +631,7 @@ public class UsersService extends BaseService { */ @Transactional(rollbackFor = RuntimeException.class) public Map grantDataSource(User loginUser, int userId, String datasourceIds) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); result.put(Constants.STATUS, false); //only admin can operate @@ -708,7 +711,7 @@ public class UsersService extends BaseService { * @return user list */ public Map queryAllGeneralUsers(User loginUser) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -729,7 +732,7 @@ public class UsersService extends BaseService { * @return user list */ public Map queryUserList(User loginUser) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -773,7 +776,7 @@ public class UsersService extends BaseService { */ public Map unauthorizedUser(User loginUser, Integer alertgroupId) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -809,7 +812,7 @@ public class UsersService extends BaseService { * @return authorized result code */ public Map authorizedUser(User loginUser, Integer alertgroupId) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -821,24 +824,6 @@ public class UsersService extends BaseService { return result; } - /** - * check - * - * @param result result - * @param bool bool - * @param userNoOperationPerm status - * @return check result - */ - private boolean check(Map result, boolean bool, Status userNoOperationPerm) { - //only admin can operate - if (bool) { - result.put(Constants.STATUS, userNoOperationPerm); - result.put(Constants.MSG, userNoOperationPerm.getMsg()); - return true; - } - return false; - } - /** * @param tenantId tenant id * @return true if tenant exists, otherwise return false diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 7f1db3ff18..8c9797b274 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -881,7 +881,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements processMeta.getProcessDefinitionLocations(), processMeta.getProcessDefinitionConnects()); putMsg(result, Status.SUCCESS); - } catch (JsonProcessingException e) { + } catch (Exception e) { logger.error("import process meta json data: {}", e.getMessage(), e); putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index 52899f5c09..2b3d832333 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -308,6 +308,7 @@ public class TenantServiceImpl extends BaseService implements TenantService { } else { putMsg(result, Status.TENANT_NOT_EXIST); } + return result; } /** diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AccessTokenServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AccessTokenServiceTest.java index f5543487ea..e10d7185f9 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AccessTokenServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AccessTokenServiceTest.java @@ -81,7 +81,7 @@ public class AccessTokenServiceTest { public void testCreateToken() { when(accessTokenMapper.insert(any(AccessToken.class))).thenReturn(2); - Map result = accessTokenService.createToken(1, getDate(), "AccessTokenServiceTest"); + Map result = accessTokenService.createToken(getLoginUser(), 1, getDate(), "AccessTokenServiceTest"); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @@ -89,7 +89,7 @@ public class AccessTokenServiceTest { @Test public void testGenerateToken() { - Map result = accessTokenService.generateToken(Integer.MAX_VALUE, getDate()); + Map result = accessTokenService.generateToken(getLoginUser(), Integer.MAX_VALUE,getDate()); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); String token = (String) result.get(Constants.DATA_LIST); @@ -121,16 +121,24 @@ public class AccessTokenServiceTest { public void testUpdateToken() { when(accessTokenMapper.selectById(1)).thenReturn(getEntity()); - Map result = accessTokenService.updateToken(1, Integer.MAX_VALUE, getDate(), "token"); + Map result = accessTokenService.updateToken(getLoginUser(), 1,Integer.MAX_VALUE,getDate(),"token"); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); // not exist - result = accessTokenService.updateToken(2, Integer.MAX_VALUE, getDate(), "token"); + result = accessTokenService.updateToken(getLoginUser(), 2,Integer.MAX_VALUE,getDate(),"token"); logger.info(result.toString()); Assert.assertEquals(Status.ACCESS_TOKEN_NOT_EXIST, result.get(Constants.STATUS)); } + + private User getLoginUser(){ + User loginUser = new User(); + loginUser.setId(1); + loginUser.setUserType(UserType.ADMIN_USER); + return loginUser; + } + /** * create entity */ diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index 407f6b587f..14cd394611 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -139,6 +139,10 @@ public class ResourcesServiceTest { Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg()); //PARENT_RESOURCE_NOT_EXIST + user.setId(1); + user.setTenantId(1); + Mockito.when(userMapper.selectById(1)).thenReturn(getUser()); + Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(resourcesMapper.selectById(Mockito.anyInt())).thenReturn(null); result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,1,"/"); @@ -159,19 +163,19 @@ public class ResourcesServiceTest { PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); User user = new User(); //HDFS_NOT_STARTUP - Result result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE); + Result result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg()); //RESOURCE_NOT_EXIST Mockito.when(resourcesMapper.selectById(1)).thenReturn(getResource()); PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); - result = resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg()); //USER_NO_OPERATION_PERM - result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(),result.getMsg()); @@ -186,7 +190,7 @@ public class ResourcesServiceTest { } catch (IOException e) { logger.error(e.getMessage(),e); } - result = resourcesService.updateResource(user, 1, "ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF); + result = resourcesService.updateResource(user, 1, "ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF,null); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg()); //SUCCESS @@ -199,25 +203,25 @@ public class ResourcesServiceTest { logger.error(e.getMessage(),e); } - result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); //RESOURCE_EXIST Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList()); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg()); //USER_NOT_EXIST Mockito.when(userMapper.selectById(Mockito.anyInt())).thenReturn(null); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null); logger.info(result.toString()); Assert.assertTrue(Status.USER_NOT_EXIST.getCode() == result.getCode()); //TENANT_NOT_EXIST Mockito.when(userMapper.selectById(1)).thenReturn(getUser()); Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null); logger.info(result.toString()); Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(),result.getMsg()); @@ -231,7 +235,7 @@ public class ResourcesServiceTest { logger.error(e.getMessage(),e); } - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF,null); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java index c4d3d6e126..d86c2cc93e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java @@ -225,13 +225,13 @@ public class UsersServiceTest { String userPassword = "userTest0001"; try { //user not exist - Map result = usersService.updateUser(0,userName,userPassword,"3443@qq.com",1,"13457864543","queue", 1); + Map result = usersService.updateUser(getLoginUser(), 0,userName,userPassword,"3443@qq.com",1,"13457864543","queue", 1); Assert.assertEquals(Status.USER_NOT_EXIST, result.get(Constants.STATUS)); logger.info(result.toString()); //success when(userMapper.selectById(1)).thenReturn(getUser()); - result = usersService.updateUser(1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue", 1); + result = usersService.updateUser(getLoginUser(), 1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue", 1); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } catch (Exception e) { @@ -357,6 +357,12 @@ public class UsersServiceTest { } + private User getLoginUser(){ + User loginUser = new User(); + loginUser.setId(1); + loginUser.setUserType(UserType.ADMIN_USER); + return loginUser; + } @Test public void getUserInfo(){ diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index 2ade59550f..52f2361581 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -21,7 +21,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT dolphinscheduler-common dolphinscheduler-common diff --git a/dolphinscheduler-dao/pom.xml b/dolphinscheduler-dao/pom.xml index c474f6d992..1381d7f63e 100644 --- a/dolphinscheduler-dao/pom.xml +++ b/dolphinscheduler-dao/pom.xml @@ -21,7 +21,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT dolphinscheduler-dao ${project.artifactId} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java index a2ce6b29b8..b7351f4b49 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java @@ -100,5 +100,12 @@ public interface UdfFuncMapper extends BaseMapper { */ List listAuthorizedUdfByResourceId(@Param("userId") int userId,@Param("resourceIds") int[] resourceIds); + /** + * batch update udf func + * @param udfFuncList udf list + * @return update num + */ + int batchUpdateUdfFunc(@Param("udfFuncList") List udfFuncList); + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index d3b829cb4f..68d547f0a0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -165,9 +166,6 @@ public class DagHelper { resultList.add(startNode); } if (CollectionUtils.isEmpty(depList)) { - if (null != startNode) { - visitedNodeNameList.add(startNode.getName()); - } return resultList; } for (String depNodeName : depList) { @@ -252,71 +250,86 @@ public class DagHelper { return null; } - /** - * get start vertex in one dag - * it would find the post node if the start vertex is forbidden running - * @param parentNodeName previous node + * the task can be submit when all the depends nodes are forbidden or complete + * @param taskNode taskNode * @param dag dag * @param completeTaskList completeTaskList - * @return start Vertex list + * @return can submit */ - public static Collection getStartVertex(String parentNodeName, DAG dag, - Map completeTaskList){ - - if(completeTaskList == null){ - completeTaskList = new HashMap<>(); + public static boolean allDependsForbiddenOrEnd(TaskNode taskNode, + DAG dag, + Map skipTaskNodeList, + Map completeTaskList) { + List dependList = taskNode.getDepList(); + if (dependList == null) { + return true; } - Collection startVertexs = null; - if(StringUtils.isNotEmpty(parentNodeName)){ - startVertexs = dag.getSubsequentNodes(parentNodeName); - }else{ - startVertexs = dag.getBeginNode(); + for (String dependNodeName : dependList) { + TaskNode dependNode = dag.getNode(dependNodeName); + if (completeTaskList.containsKey(dependNodeName) + || dependNode.isForbidden() + || skipTaskNodeList.containsKey(dependNodeName)) { + continue; + } else { + return false; + } } + return true; + } - List tmpStartVertexs = new ArrayList<>(); - if(startVertexs!= null){ - tmpStartVertexs.addAll(startVertexs); + /** + * parse the successor nodes of previous node. + * this function parse the condition node to find the right branch. + * also check all the depends nodes forbidden or complete + * @param preNodeName + * @return successor nodes + */ + public static Set parsePostNodes(String preNodeName, + Map skipTaskNodeList, + DAG dag, + Map completeTaskList) { + Set postNodeList = new HashSet<>(); + Collection startVertexes = new ArrayList<>(); + if (preNodeName == null) { + startVertexes = dag.getBeginNode(); + } else if (dag.getNode(preNodeName).isConditionsTask()) { + List conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList); + startVertexes.addAll(conditionTaskList); + } else { + startVertexes = dag.getSubsequentNodes(preNodeName); } - - for(String start : startVertexs){ - TaskNode startNode = dag.getNode(start); - if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){ - // the start can be submit if not forbidden and not in complete tasks + for (String subsequent : startVertexes) { + TaskNode taskNode = dag.getNode(subsequent); + if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) { + setTaskNodeSkip(subsequent, dag, completeTaskList, skipTaskNodeList ); continue; } - // then submit the post nodes - Collection postNodes = getStartVertex(start, dag, completeTaskList); - for(String post : postNodes){ - TaskNode postNode = dag.getNode(post); - if(taskNodeCanSubmit(postNode, dag, completeTaskList)){ - tmpStartVertexs.add(post); - } + if (!DagHelper.allDependsForbiddenOrEnd(taskNode, dag, skipTaskNodeList, completeTaskList)) { + continue; } - tmpStartVertexs.remove(start); + if (taskNode.isForbidden() || completeTaskList.containsKey(subsequent)) { + postNodeList.addAll(parsePostNodes(subsequent, skipTaskNodeList, dag, completeTaskList)); + continue; + } + postNodeList.add(subsequent); } - return tmpStartVertexs; + return postNodeList; } /** - * the task can be submit when all the depends nodes are forbidden or complete - * @param taskNode taskNode - * @param dag dag - * @param completeTaskList completeTaskList - * @return can submit + * if all of the task dependence are skipped, skip it too. + * @param taskNode + * @return */ - public static boolean taskNodeCanSubmit(TaskNode taskNode, - DAG dag, - Map completeTaskList) { - - List dependList = taskNode.getDepList(); - if(dependList == null){ - return true; + private static boolean isTaskNodeNeedSkip(TaskNode taskNode, + Map skipTaskNodeList + ){ + if(CollectionUtils.isEmpty(taskNode.getDepList())){ + return false; } - - for(String dependNodeName : dependList){ - TaskNode dependNode = dag.getNode(dependNodeName); - if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){ + for(String depNode : taskNode.getDepList()){ + if(!skipTaskNodeList.containsKey(depNode)){ return false; } } @@ -324,6 +337,66 @@ public class DagHelper { } + /** + * parse condition task find the branch process + * set skip flag for another one. + * @param nodeName + * @return + */ + public static List parseConditionTask(String nodeName, + Map skipTaskNodeList, + DAG dag, + Map completeTaskList){ + List conditionTaskList = new ArrayList<>(); + TaskNode taskNode = dag.getNode(nodeName); + if (!taskNode.isConditionsTask()){ + return conditionTaskList; + } + if (!completeTaskList.containsKey(nodeName)){ + return conditionTaskList; + } + TaskInstance taskInstance = completeTaskList.get(nodeName); + ConditionsParameters conditionsParameters = + JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class); + List skipNodeList = new ArrayList<>(); + if(taskInstance.getState().typeIsSuccess()){ + conditionTaskList = conditionsParameters.getSuccessNode(); + skipNodeList = conditionsParameters.getFailedNode(); + }else if(taskInstance.getState().typeIsFailure()){ + conditionTaskList = conditionsParameters.getFailedNode(); + skipNodeList = conditionsParameters.getSuccessNode(); + }else{ + conditionTaskList.add(nodeName); + } + for(String failedNode : skipNodeList){ + setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList); + } + return conditionTaskList; + } + + /** + * set task node and the post nodes skip flag + * @param skipNodeName + * @param dag + * @param completeTaskList + * @param skipTaskNodeList + */ + private static void setTaskNodeSkip(String skipNodeName, + DAG dag, + Map completeTaskList, + Map skipTaskNodeList){ + skipTaskNodeList.putIfAbsent(skipNodeName, dag.getNode(skipNodeName)); + Collection postNodeList = dag.getSubsequentNodes(skipNodeName); + for(String post : postNodeList){ + TaskNode postNode = dag.getNode(post); + if(isTaskNodeNeedSkip(postNode, skipTaskNodeList)){ + setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList); + } + } + } + + + /*** * build dag graph * @param processDag processDag @@ -383,7 +456,7 @@ public class DagHelper { */ public static boolean haveConditionsAfterNode(String parentNodeName, DAG dag - ){ + ){ boolean result = false; Set subsequentNodes = dag.getSubsequentNodes(parentNodeName); if(CollectionUtils.isEmpty(subsequentNodes)){ diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml index e38d1637d6..18de6db620 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml @@ -111,4 +111,17 @@ + + + + update t_ds_udfs + + resource_name=#{udf.resourceName}, + update_time=#{udf.updateTime} + + + id=#{udf.id} + + + \ No newline at end of file diff --git a/dolphinscheduler-dist/pom.xml b/dolphinscheduler-dist/pom.xml index b193dd85c8..c461564dbf 100644 --- a/dolphinscheduler-dist/pom.xml +++ b/dolphinscheduler-dist/pom.xml @@ -20,7 +20,7 @@ dolphinscheduler org.apache.dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT 4.0.0 diff --git a/dolphinscheduler-microbench/pom.xml b/dolphinscheduler-microbench/pom.xml index 6b11b2e2d6..606ecd3c38 100644 --- a/dolphinscheduler-microbench/pom.xml +++ b/dolphinscheduler-microbench/pom.xml @@ -21,7 +21,7 @@ dolphinscheduler org.apache.dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT 4.0.0 diff --git a/dolphinscheduler-plugin-api/pom.xml b/dolphinscheduler-plugin-api/pom.xml index 7db15e73c3..0c2547d137 100644 --- a/dolphinscheduler-plugin-api/pom.xml +++ b/dolphinscheduler-plugin-api/pom.xml @@ -23,7 +23,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT dolphinscheduler-plugin-api ${project.artifactId} diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml index 4d398f3069..d4402d61cc 100644 --- a/dolphinscheduler-remote/pom.xml +++ b/dolphinscheduler-remote/pom.xml @@ -20,7 +20,7 @@ dolphinscheduler org.apache.dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT 4.0.0 diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 4f477fb467..77c6064e5b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -78,6 +78,16 @@ public enum CommandType { */ TASK_EXECUTE_RESPONSE, + /** + * db task ack + */ + DB_TASK_ACK, + + /** + * db task response + */ + DB_TASK_RESPONSE, + /** * kill task */ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java index f37eb979fc..095e38697d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java @@ -16,7 +16,7 @@ */ package org.apache.dolphinscheduler.remote.command; -import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -56,7 +56,7 @@ public class DBTaskAckCommand implements Serializable { public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.DB_TASK_ACK); - byte[] body = FastJsonSerializer.serialize(this); + byte[] body = JsonSerializer.serialize(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java index a64029822c..56712e4c93 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java @@ -16,7 +16,7 @@ */ package org.apache.dolphinscheduler.remote.command; -import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -56,7 +56,7 @@ public class DBTaskResponseCommand implements Serializable { public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.DB_TASK_RESPONSE); - byte[] body = FastJsonSerializer.serialize(this); + byte[] body = JsonSerializer.serialize(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml index 4cbce0ab47..10fa58faa3 100644 --- a/dolphinscheduler-server/pom.xml +++ b/dolphinscheduler-server/pom.xml @@ -21,7 +21,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT dolphinscheduler-server dolphinscheduler-server diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java new file mode 100644 index 0000000000..11598d9ace --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.utils.DependentUtils; +import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.utils.LogUtils; + +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { + + /** + * dependent parameters + */ + private DependentParameters dependentParameters; + + /** + * complete task map + */ + private Map completeTaskList = new ConcurrentHashMap<>(); + + /** + * condition result + */ + private DependResult conditionResult; + + /** + * constructor of MasterBaseTaskExecThread + * + * @param taskInstance task instance + */ + public ConditionsTaskExecThread(TaskInstance taskInstance) { + super(taskInstance); + taskInstance.setStartTime(new Date()); + } + + @Override + public Boolean submitWaitComplete() { + try{ + this.taskInstance = submit(); + logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskInstance.getProcessDefinitionId(), + taskInstance.getProcessInstanceId(), + taskInstance.getId())); + String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); + Thread.currentThread().setName(threadLoggerInfoName); + initTaskParameters(); + logger.info("dependent task start"); + waitTaskQuit(); + updateTaskState(); + }catch (Exception e){ + logger.error("conditions task run exception" , e); + } + return true; + } + + private void waitTaskQuit() { + List taskInstances = processService.findValidTaskListByProcessId( + taskInstance.getProcessInstanceId() + ); + for(TaskInstance task : taskInstances){ + completeTaskList.putIfAbsent(task.getName(), task.getState()); + } + + List modelResultList = new ArrayList<>(); + for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){ + + List itemDependResult = new ArrayList<>(); + for(DependentItem item : dependentTaskModel.getDependItemList()){ + itemDependResult.add(getDependResultForItem(item)); + } + DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult); + modelResultList.add(modelResult); + } + conditionResult = DependentUtils.getDependResultForRelation( + dependentParameters.getRelation(), modelResultList + ); + logger.info("the conditions task depend result : {}", conditionResult); + } + + /** + * + */ + private void updateTaskState() { + ExecutionStatus status; + if(this.cancel){ + status = ExecutionStatus.KILL; + }else{ + status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE; + } + taskInstance.setState(status); + taskInstance.setEndTime(new Date()); + processService.updateTaskInstance(taskInstance); + } + + private void initTaskParameters() { + this.taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance)); + this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); + taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + taskInstance.setStartTime(new Date()); + this.processService.saveTaskInstance(taskInstance); + + this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class); + } + + /** + * depend result for depend item + * @param item + * @return + */ + private DependResult getDependResultForItem(DependentItem item){ + + DependResult dependResult = DependResult.SUCCESS; + if(!completeTaskList.containsKey(item.getDepTasks())){ + logger.info("depend item: {} have not completed yet.", item.getDepTasks()); + dependResult = DependResult.FAILED; + return dependResult; + } + ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks()); + if(executionStatus != item.getStatus()){ + logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus(), executionStatus); + dependResult = DependResult.FAILED; + } + logger.info("dependent item complete {} {},{}", + Constants.DEPENDENT_SPLIT, item.getDepTasks(), dependResult); + return dependResult; + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index ea3ad19950..f5c3708af1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -14,14 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.dolphinscheduler.server.master.runner; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -34,6 +37,11 @@ import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.dolphinscheduler.common.Constants.*; + +import java.util.Date; +import java.util.concurrent.Callable; + /** * master task exec base class @@ -81,10 +89,19 @@ public class MasterBaseTaskExecThread implements Callable { */ private TaskPriorityQueue taskUpdateQueue; + /** + * whether need check task time out. + */ + protected boolean checkTimeoutFlag = false; + + /** + * task timeout parameters + */ + protected TaskTimeoutParameter taskTimeoutParameter; + /** * constructor of MasterBaseTaskExecThread - * - * @param taskInstance task instance + * @param taskInstance task instance */ public MasterBaseTaskExecThread(TaskInstance taskInstance) { this.processService = SpringApplicationContext.getBean(ProcessService.class); @@ -93,6 +110,27 @@ public class MasterBaseTaskExecThread implements Callable { this.taskInstance = taskInstance; this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class); + initTaskParams(); + } + + /** + * init task ordinary parameters + */ + private void initTaskParams() { + initTimeoutParams(); + } + + /** + * init task timeout parameters + */ + private void initTimeoutParams() { + String taskJson = taskInstance.getTaskJson(); + TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); + taskTimeoutParameter = taskNode.getTaskTimeoutParameter(); + + if(taskTimeoutParameter.getEnable()){ + checkTimeoutFlag = true; + } } /** @@ -113,7 +151,6 @@ public class MasterBaseTaskExecThread implements Callable { /** * submit master base task exec thread - * * @return TaskInstance */ protected TaskInstance submit() { @@ -154,14 +191,13 @@ public class MasterBaseTaskExecThread implements Callable { return task; } - /** * dispatcht task - * * @param taskInstance taskInstance * @return whether submit task success */ public Boolean dispatchTask(TaskInstance taskInstance) { + try{ if(taskInstance.isConditionsTask() || taskInstance.isDependTask() @@ -198,7 +234,6 @@ public class MasterBaseTaskExecThread implements Callable { } } - /** * buildTaskPriorityInfo * @@ -227,7 +262,6 @@ public class MasterBaseTaskExecThread implements Callable { /** * submit wait complete - * * @return true */ protected Boolean submitWaitComplete() { @@ -236,7 +270,6 @@ public class MasterBaseTaskExecThread implements Callable { /** * call - * * @return boolean * @throws Exception exception */ @@ -246,4 +279,56 @@ public class MasterBaseTaskExecThread implements Callable { return submitWaitComplete(); } + /** + * alert time out + * @return + */ + protected boolean alertTimeout(){ + if( TaskTimeoutStrategy.FAILED == this.taskTimeoutParameter.getStrategy()){ + return true; + } + logger.warn("process id:{} process name:{} task id: {},name:{} execution time out", + processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); + // send warn mail + ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); + alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(),processDefine.getReceivers(), + processDefine.getReceiversCc(), processInstance.getId(), processInstance.getName(), + taskInstance.getId(),taskInstance.getName()); + return true; + } + + /** + * handle time out for time out strategy warn&&failed + */ + protected void handleTimeoutFailed(){ + if(TaskTimeoutStrategy.WARN == this.taskTimeoutParameter.getStrategy()){ + return; + } + logger.info("process id:{} name:{} task id:{} name:{} cancel because of timeout.", + processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); + this.cancel = true; + } + + /** + * check task remain time valid + * @return + */ + protected boolean checkTaskTimeout(){ + if (!checkTimeoutFlag || taskInstance.getStartTime() == null){ + return false; + } + long remainTime = getRemainTime(taskTimeoutParameter.getInterval() * 60L); + return remainTime <= 0; + } + + /** + * get remain time + * + * @return remain time + */ + protected long getRemainTime(long timeoutSeconds) { + Date startTime = taskInstance.getStartTime(); + long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000; + return timeoutSeconds - usedTime; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 3c28e16651..2bc8031bce 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -68,6 +68,7 @@ import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -348,11 +349,12 @@ public class MasterExecThread implements Runnable { * @throws Exception exception */ private void prepareProcess() throws Exception { - // init task queue - initTaskQueue(); // gen process dag buildFlowDag(); + + // init task queue + initTaskQueue(); logger.info("prepare process :{} end", processInstance.getId()); } @@ -407,6 +409,9 @@ public class MasterExecThread implements Runnable { if(task.isTaskComplete()){ completeTaskList.put(task.getName(), task); } + if(task.isConditionsTask() || DagHelper.haveConditionsAfterNode(task.getName(), dag)){ + continue; + } if(task.getState().typeIsFailure() && !task.taskCanRetry()){ errorTaskList.put(task.getName(), task); } @@ -498,6 +503,9 @@ public class MasterExecThread implements Runnable { // task instance whether alert taskInstance.setAlertFlag(Flag.NO); + // task instance start time + taskInstance.setStartTime(null); + // task instance flag taskInstance.setFlag(Flag.YES); @@ -532,132 +540,13 @@ public class MasterExecThread implements Runnable { return taskInstance; } - - - /** - * if all of the task dependence are skip, skip it too. - * @param taskNode - * @return - */ - private boolean isTaskNodeNeedSkip(TaskNode taskNode){ - if(CollectionUtils.isEmpty(taskNode.getDepList())){ - return false; - } - for(String depNode : taskNode.getDepList()){ - if(!skipTaskNodeList.containsKey(depNode)){ - return false; - } - } - return true; - } - - /** - * set task node skip if dependence all skip - * @param taskNodesSkipList - */ - private void setTaskNodeSkip(List taskNodesSkipList){ - for(String skipNode : taskNodesSkipList){ - skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode)); - Collection postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList); - List postSkipList = new ArrayList<>(); - for(String post : postNodeList){ - TaskNode postNode = dag.getNode(post); - if(isTaskNodeNeedSkip(postNode)){ - postSkipList.add(post); - } - } - setTaskNodeSkip(postSkipList); - } - } - - - /** - * parse condition task find the branch process - * set skip flag for another one. - * @param nodeName - * @return - */ - private List parseConditionTask(String nodeName){ - List conditionTaskList = new ArrayList<>(); - TaskNode taskNode = dag.getNode(nodeName); - if(!taskNode.isConditionsTask()){ - return conditionTaskList; - } - ConditionsParameters conditionsParameters = - JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class); - - TaskInstance taskInstance = completeTaskList.get(nodeName); - if(taskInstance == null){ - logger.error("task instance {} cannot find, please check it!", nodeName); - return conditionTaskList; - } - - if(taskInstance.getState().typeIsSuccess()){ - conditionTaskList = conditionsParameters.getSuccessNode(); - setTaskNodeSkip(conditionsParameters.getFailedNode()); - }else if(taskInstance.getState().typeIsFailure()){ - conditionTaskList = conditionsParameters.getFailedNode(); - setTaskNodeSkip(conditionsParameters.getSuccessNode()); - }else{ - conditionTaskList.add(nodeName); - } - return conditionTaskList; - } - - /** - * parse post node list of previous node - * if condition node: return process according to the settings - * if post node completed, return post nodes of the completed node - * @param previousNodeName - * @return - */ - private List parsePostNodeList(String previousNodeName){ - List postNodeList = new ArrayList<>(); - - TaskNode taskNode = dag.getNode(previousNodeName); - if(taskNode != null && taskNode.isConditionsTask()){ - return parseConditionTask(previousNodeName); - } - Collection postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList); - List postSkipList = new ArrayList<>(); - // delete success node, parse the past nodes - // if conditions node, - // 1. parse the branch process according the conditions setting - // 2. set skip flag on anther branch process - for(String postNode : postNodeCollection){ - if(completeTaskList.containsKey(postNode)){ - TaskInstance postTaskInstance = completeTaskList.get(postNode); - if(dag.getNode(postNode).isConditionsTask()){ - List conditionTaskNodeList = parseConditionTask(postNode); - for(String conditions : conditionTaskNodeList){ - postNodeList.addAll(parsePostNodeList(conditions)); - } - }else if(postTaskInstance.getState().typeIsSuccess()){ - postNodeList.addAll(parsePostNodeList(postNode)); - }else{ - postNodeList.add(postNode); - } - - }else if(isTaskNodeNeedSkip(dag.getNode(postNode))){ - postSkipList.add(postNode); - setTaskNodeSkip(postSkipList); - postSkipList.clear(); - }else{ - postNodeList.add(postNode); - } - } - return postNodeList; - } - /** * submit post node * @param parentNodeName parent node name */ private Map propToValue = new ConcurrentHashMap(); private void submitPostNode(String parentNodeName){ - - List submitTaskNodeList = parsePostNodeList(parentNodeName); - + Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList); List taskInstances = new ArrayList<>(); for(String taskNode : submitTaskNodeList){ try { @@ -702,7 +591,6 @@ public class MasterExecThread implements Runnable { if(startNodes.contains(taskName)){ return DependResult.SUCCESS; } - TaskNode taskNode = dag.getNode(taskName); List depNameList = taskNode.getDepList(); for(String depsNode : depNameList ){ @@ -716,23 +604,42 @@ public class MasterExecThread implements Runnable { return DependResult.WAITING; } ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); - // conditions task would not return failed. - if(depTaskState.typeIsFailure() - && !DagHelper.haveConditionsAfterNode(depsNode, dag ) - && !dag.getNode(depsNode).isConditionsTask()){ - return DependResult.FAILED; - } - if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ return DependResult.WAITING; } + // ignore task state if current task is condition + if(taskNode.isConditionsTask()){ + continue; + } + if(!dependTaskSuccess(depsNode, taskName)){ + return DependResult.FAILED; + } } - logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); - return DependResult.SUCCESS; } + /** + * depend node is completed, but here need check the condition task branch is the next node + * @param dependNodeName + * @param nextNodeName + * @return + */ + private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){ + if(dag.getNode(dependNodeName).isConditionsTask()){ + //condition task need check the branch to run + List nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeList, dag, completeTaskList); + if(!nextTaskList.contains(nextNodeName)){ + return false; + } + }else { + ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState(); + if(depTaskState.typeIsFailure()){ + return false; + } + } + return true; + } /** * query task instance by complete state @@ -889,6 +796,24 @@ public class MasterExecThread implements Runnable { return state; } + /** + * whether standby task list have retry tasks + * @return + */ + private boolean retryTaskExists() { + + boolean result = false; + + for(String taskName : readyToSubmitTaskList.keySet()){ + TaskInstance task = readyToSubmitTaskList.get(taskName); + if(task.getState().typeIsFailure()){ + result = true; + break; + } + } + return result; + } + /** * whether complement end * @return Boolean whether is complement end @@ -976,7 +901,7 @@ public class MasterExecThread implements Runnable { // submit start node submitPostNode(null); boolean sendTimeWarning = false; - while(!processInstance.isProcessInstanceStop()){ + while(!processInstance.isProcessInstanceStop() && Stopper.isRunning()){ // send warning email if process time out. if(!sendTimeWarning && checkProcessTimeOut(processInstance) ){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 662dc13414..89af95278c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -202,6 +202,9 @@ public abstract class AbstractCommandExecutor { return result; } + public String getVarPool() { + return varPool.toString(); + } /** * cancel application diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml index f0e8f408d7..ef41d907c2 100644 --- a/dolphinscheduler-service/pom.xml +++ b/dolphinscheduler-service/pom.xml @@ -20,7 +20,7 @@ dolphinscheduler org.apache.dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT 4.0.0 diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 10069aea10..10a2a0118d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -79,6 +79,8 @@ import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; +import org.apache.commons.lang.ArrayUtils; + import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -358,32 +360,39 @@ public class ProcessService { * remove task log file * @param processInstanceId processInstanceId */ - public void removeTaskLogFile(Integer processInstanceId) { + public void removeTaskLogFile(Integer processInstanceId){ - LogClientService logClient = new LogClientService(); + LogClientService logClient = null; - List taskInstanceList = findValidTaskListByProcessId(processInstanceId); + try { + logClient = new LogClientService(); + List taskInstanceList = findValidTaskListByProcessId(processInstanceId); - if (CollectionUtils.isEmpty(taskInstanceList)) { - return; - } + if (CollectionUtils.isEmpty(taskInstanceList)) { + return; + } - for (TaskInstance taskInstance : taskInstanceList) { - String taskLogPath = taskInstance.getLogPath(); - if (StringUtils.isEmpty(taskInstance.getHost())) { - continue; + for (TaskInstance taskInstance : taskInstanceList) { + String taskLogPath = taskInstance.getLogPath(); + if (StringUtils.isEmpty(taskInstance.getHost())) { + continue; + } + int port = Constants.RPC_PORT; + String ip = ""; + try { + ip = Host.of(taskInstance.getHost()).getIp(); + } catch (Exception e) { + // compatible old version + ip = taskInstance.getHost(); + } + + // remove task log from loggerserver + logClient.removeTaskLog(ip, port, taskLogPath); } - int port = Constants.RPC_PORT; - String ip = ""; - try { - ip = Host.of(taskInstance.getHost()).getIp(); - } catch (Exception e) { - // compatible old version - ip = taskInstance.getHost(); + }finally { + if (logClient != null) { + logClient.close(); } - - // remove task log from loggerserver - logClient.removeTaskLog(ip,port,taskLogPath); } } @@ -457,6 +466,7 @@ public class ProcessService { processInstance.getWarningType(), processInstance.getWarningGroupId(), processInstance.getScheduleTime(), + processInstance.getWorkerGroup(), processInstance.getProcessInstancePriority() ); saveCommand(command); @@ -1031,6 +1041,7 @@ public class ProcessService { parentProcessInstance.getWarningType(), parentProcessInstance.getWarningGroupId(), parentProcessInstance.getScheduleTime(), + task.getWorkerGroup(), parentProcessInstance.getProcessInstancePriority() ); } @@ -1641,8 +1652,10 @@ public class ProcessService { * @param resourceType resource type * @return tenant code */ - public String queryTenantCodeByResName(String resName,ResourceType resourceType) { - return resourceMapper.queryTenantCodeByResourceName(resName, resourceType.ordinal()); + public String queryTenantCodeByResName(String resName,ResourceType resourceType){ + // in order to query tenant code successful although the version is older + String fullName = resName.startsWith("/") ? resName : String.format("/%s",resName); + return resourceMapper.queryTenantCodeByResourceName(fullName, resourceType.ordinal()); } /** @@ -1679,7 +1692,7 @@ public class ProcessService { */ public List getCycleDependencies(int masterId,int[] ids,Date scheduledFireTime) throws Exception { List cycleDependencyList = new ArrayList(); - if (ids == null || ids.length == 0) { + if(ArrayUtils.isEmpty(ids)){ logger.warn("ids[] is empty!is invalid!"); return cycleDependencyList; } diff --git a/dolphinscheduler-ui/pom.xml b/dolphinscheduler-ui/pom.xml index 13644bad91..8e644de265 100644 --- a/dolphinscheduler-ui/pom.xml +++ b/dolphinscheduler-ui/pom.xml @@ -20,7 +20,7 @@ dolphinscheduler org.apache.dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT 4.0.0 diff --git a/pom.xml b/pom.xml index f7609611f4..98069406a1 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.dolphinscheduler dolphinscheduler - 1.3.2-SNAPSHOT + 1.3.4-SNAPSHOT pom ${project.artifactId} http://dolphinscheduler.apache.org