Browse Source

merge from 1.3.3-release

pull/3/MERGE
baoliang 4 years ago
parent
commit
403a4a20f2
  1. 2
      dolphinscheduler-alert/pom.xml
  2. 2
      dolphinscheduler-api/pom.xml
  3. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AccessTokenController.java
  4. 50
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
  5. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java
  6. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java
  7. 181
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  8. 20
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
  9. 51
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java
  10. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  11. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
  12. 16
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AccessTokenServiceTest.java
  13. 22
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  14. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
  15. 2
      dolphinscheduler-common/pom.xml
  16. 2
      dolphinscheduler-dao/pom.xml
  17. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java
  18. 177
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  19. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml
  20. 2
      dolphinscheduler-dist/pom.xml
  21. 2
      dolphinscheduler-microbench/pom.xml
  22. 2
      dolphinscheduler-plugin-api/pom.xml
  23. 2
      dolphinscheduler-remote/pom.xml
  24. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  25. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java
  26. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java
  27. 2
      dolphinscheduler-server/pom.xml
  28. 159
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  29. 103
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  30. 191
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  31. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  32. 2
      dolphinscheduler-service/pom.xml
  33. 59
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  34. 2
      dolphinscheduler-ui/pom.xml
  35. 2
      pom.xml

2
dolphinscheduler-alert/pom.xml

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-alert</artifactId>
<name>${project.artifactId}</name>

2
dolphinscheduler-api/pom.xml

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-api</artifactId>
<name>${project.artifactId}</name>

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AccessTokenController.java

@ -86,7 +86,7 @@ public class AccessTokenController extends BaseController {
logger.info("login user {}, create token , userId : {} , token expire time : {} , token : {}", loginUser.getUserName(),
userId, expireTime, token);
Map<String, Object> result = accessTokenService.createToken(userId, expireTime, token);
Map<String, Object> result = accessTokenService.createToken(loginUser, userId, expireTime, token);
return returnDataList(result);
}
@ -106,7 +106,7 @@ public class AccessTokenController extends BaseController {
@RequestParam(value = "userId") int userId,
@RequestParam(value = "expireTime") String expireTime) {
logger.info("login user {}, generate token , userId : {} , token expire time : {}", loginUser, userId, expireTime);
Map<String, Object> result = accessTokenService.generateToken(userId, expireTime);
Map<String, Object> result = accessTokenService.generateToken(loginUser, userId, expireTime);
return returnDataList(result);
}
@ -185,7 +185,7 @@ public class AccessTokenController extends BaseController {
logger.info("login user {}, update token , userId : {} , token expire time : {} , token : {}", loginUser.getUserName(),
userId, expireTime, token);
Map<String, Object> result = accessTokenService.updateToken(id, userId, expireTime, token);
Map<String, Object> result = accessTokenService.updateToken(loginUser, id, userId, expireTime, token);
return returnDataList(result);
}

50
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.api.service.UdfFuncService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
@ -65,21 +66,13 @@ public class ResourcesController extends BaseController {
/**
* create directory
*
* @param loginUser login user
* @param alias alias
* @param description description
* @param type type
* @return create result code
*/
/**
* @param loginUser login user
* @param type type
* @param alias alias
* @param description description
* @param pid parent id
* @param currentDir current directory
* @return
* @return create result code
*/
@ApiOperation(value = "createDirctory", notes = "CREATE_RESOURCE_NOTES")
@ApiImplicitParams({
@ -140,6 +133,7 @@ public class ResourcesController extends BaseController {
* @param resourceId resource id
* @param type resource type
* @param description description
* @param file resource file
* @return update result code
*/
@ApiOperation(value = "updateResource", notes = "UPDATE_RESOURCE_NOTES")
@ -147,7 +141,8 @@ public class ResourcesController extends BaseController {
@ApiImplicitParam(name = "id", value = "RESOURCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType = "ResourceType"),
@ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType = "String")
@ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType = "String"),
@ApiImplicitParam(name = "file", value = "RESOURCE_FILE", required = true, dataType = "MultipartFile")
})
@PostMapping(value = "/update")
@ApiException(UPDATE_RESOURCE_ERROR)
@ -155,10 +150,11 @@ public class ResourcesController extends BaseController {
@RequestParam(value = "id") int resourceId,
@RequestParam(value = "type") ResourceType type,
@RequestParam(value = "name") String alias,
@RequestParam(value = "description", required = false) String description) {
logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}",
loginUser.getUserName(), type, alias, description);
return resourceService.updateResource(loginUser, resourceId, alias, description, type);
@RequestParam(value = "description", required = false) String description,
@RequestParam(value = "file" ,required = false) MultipartFile file) {
logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}, file: {}",
loginUser.getUserName(), type, alias, description, file);
return resourceService.updateResource(loginUser, resourceId, alias, description, type, file);
}
/**
@ -280,7 +276,7 @@ public class ResourcesController extends BaseController {
* @param type resource type
* @return resource list
*/
@ApiOperation(value = "queryResourceJarList", notes = "QUERY_RESOURCE_LIST_NOTES")
@ApiOperation(value = "queryResourceByProgramType", notes = "QUERY_RESOURCE_LIST_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType = "ResourceType")
})
@ -288,10 +284,14 @@ public class ResourcesController extends BaseController {
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_RESOURCES_LIST_ERROR)
public Result queryResourceJarList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "type") ResourceType type
@RequestParam(value = "type") ResourceType type,
@RequestParam(value = "programType",required = false) ProgramType programType
) {
logger.info("query resource list, login user:{}, resource type:{}", loginUser.getUserName(), type.toString());
Map<String, Object> result = resourceService.queryResourceJarList(loginUser, type);
String programTypeName = programType == null ? "" : programType.name();
String userName = loginUser.getUserName();
userName = userName.replaceAll("[\n|\r|\t]", "_");
logger.info("query resource list, login user:{}, resource type:{}, program type:{}", userName,programTypeName);
Map<String, Object> result = resourceService.queryResourceByProgramType(loginUser, type,programType);
return returnDataList(result);
}
@ -569,7 +569,7 @@ public class ResourcesController extends BaseController {
@GetMapping(value = "/udf-func/list-paging")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_UDF_FUNCTION_LIST_PAGING_ERROR)
public Result queryUdfFuncList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result<Object> queryUdfFuncListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam("pageSize") Integer pageSize
@ -586,23 +586,25 @@ public class ResourcesController extends BaseController {
}
/**
* query resource list by type
* query udf func list by type
*
* @param loginUser login user
* @param type resource type
* @return resource list
*/
@ApiOperation(value = "queryResourceList", notes = "QUERY_RESOURCE_LIST_NOTES")
@ApiOperation(value = "queryUdfFuncList", notes = "QUERY_UDF_FUNC_LIST_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "UDF_TYPE", required = true, dataType = "UdfType")
})
@GetMapping(value = "/udf-func/list")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_DATASOURCE_BY_TYPE_ERROR)
public Result queryResourceList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result<Object> queryUdfFuncList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("type") UdfType type) {
logger.info("query datasource list, user:{}, type:{}", loginUser.getUserName(), type);
Map<String, Object> result = udfFuncService.queryResourceList(loginUser, type.ordinal());
String userName = loginUser.getUserName();
userName = userName.replaceAll("[\n|\r|\t]", "_");
logger.info("query udf func list, user:{}, type:{}", userName, type);
Map<String, Object> result = udfFuncService.queryUdfFuncList(loginUser, type.ordinal());
return returnDataList(result);
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java

@ -166,7 +166,7 @@ public class UsersController extends BaseController {
@RequestParam(value = "state", required = false) int state) throws Exception {
logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}, state: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone, queue, state);
Map<String, Object> result = usersService.updateUser(id, userName, userPassword, email, tenantId, phone, queue, state);
Map<String, Object> result = usersService.updateUser(loginUser, id, userName, userPassword, email, tenantId, phone, queue, state);
return returnDataList(result);
}

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java

@ -44,7 +44,8 @@ public interface AccessTokenService {
* @param token token string
* @return create result code
*/
Map<String, Object> createToken(int userId, String expireTime, String token);
Map<String, Object> createToken(User loginUser, int userId, String expireTime, String token);
/**
* generate token
@ -53,7 +54,7 @@ public interface AccessTokenService {
* @param expireTime token expire time
* @return token string
*/
Map<String, Object> generateToken(int userId, String expireTime);
Map<String, Object> generateToken(User loginUser, int userId, String expireTime);
/**
* delete access token
@ -73,5 +74,5 @@ public interface AccessTokenService {
* @param token token string
* @return update result code
*/
Map<String, Object> updateToken(int id, int userId, String expireTime, String token);
Map<String, Object> updateToken(User loginUser, int id, int userId, String expireTime, String token);
}

181
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
@ -87,7 +88,7 @@ public class ResourcesService extends BaseService {
* @param currentDir current directory
* @return create directory result
*/
@Transactional(rollbackFor = RuntimeException.class)
@Transactional(rollbackFor = Exception.class)
public Result createDirectory(User loginUser,
String name,
String description,
@ -101,8 +102,11 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.HDFS_NOT_STARTUP);
return result;
}
String fullName = "/".equals(currentDir) ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
result = verifyResourceName(fullName,type,loginUser);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
if (pid != -1) {
Resource parentResource = resourcesMapper.selectById(pid);
@ -165,7 +169,7 @@ public class ResourcesService extends BaseService {
* @param currentDir current directory
* @return create result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@Transactional(rollbackFor = Exception.class)
public Result createResource(User loginUser,
String name,
String desc,
@ -230,7 +234,7 @@ public class ResourcesService extends BaseService {
}
// check resoure name exists
String fullName = "/".equals(currentDir) ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
if (checkResourceExists(fullName, 0, type.ordinal())) {
logger.error("resource {} has exist, can't recreate", name);
putMsg(result, Status.RESOURCE_EXIST);
@ -288,14 +292,16 @@ public class ResourcesService extends BaseService {
* @param name name
* @param desc description
* @param type resource type
* @param file resource file
* @return update result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@Transactional(rollbackFor = Exception.class)
public Result updateResource(User loginUser,
int resourceId,
String name,
String desc,
ResourceType type) {
ResourceType type,
MultipartFile file) {
Result result = new Result();
// if resource upload startup
@ -315,7 +321,7 @@ public class ResourcesService extends BaseService {
return result;
}
if (name.equals(resource.getAlias()) && desc.equals(resource.getDescription())) {
if (file == null && name.equals(resource.getAlias()) && desc.equals(resource.getDescription())) {
putMsg(result, Status.SUCCESS);
return result;
}
@ -331,6 +337,42 @@ public class ResourcesService extends BaseService {
return result;
}
if (file != null) {
// file is empty
if (file.isEmpty()) {
logger.error("file is empty: {}", file.getOriginalFilename());
putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
return result;
}
// file suffix
String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
String nameSuffix = FileUtils.suffix(name);
// determine file suffix
if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
/**
* rename file suffix and original suffix must be consistent
*/
logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename());
putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
return result;
}
//If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) {
logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
return result;
}
if (file.getSize() > Constants.MAX_FILE_SIZE) {
logger.error("file size is too large: {}", file.getOriginalFilename());
putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
return result;
}
}
// query tenant by user id
String tenantCode = getTenantCode(resource.getUserId(),result);
if (StringUtils.isEmpty(tenantCode)){
@ -380,31 +422,61 @@ public class ResourcesService extends BaseService {
}
// updateResource data
List<Integer> childrenResource = listAllChildren(resource,false);
Date now = new Date();
resource.setAlias(name);
resource.setFullName(fullName);
resource.setDescription(desc);
resource.setUpdateTime(now);
if (file != null) {
resource.setFileName(file.getOriginalFilename());
resource.setSize(file.getSize());
}
try {
resourcesMapper.updateById(resource);
if (resource.isDirectory() && CollectionUtils.isNotEmpty(childrenResource)) {
String matcherFullName = Matcher.quoteReplacement(fullName);
List<Resource> childResourceList = new ArrayList<>();
List<Resource> resourceList = resourcesMapper.listResourceByIds(childrenResource.toArray(new Integer[childrenResource.size()]));
childResourceList = resourceList.stream().map(t -> {
t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName));
t.setUpdateTime(now);
return t;
}).collect(Collectors.toList());
resourcesMapper.batchUpdateResource(childResourceList);
if (resource.isDirectory()) {
List<Integer> childrenResource = listAllChildren(resource,false);
if (CollectionUtils.isNotEmpty(childrenResource)) {
String matcherFullName = Matcher.quoteReplacement(fullName);
List<Resource> childResourceList = new ArrayList<>();
Integer[] childResIdArray = childrenResource.toArray(new Integer[childrenResource.size()]);
List<Resource> resourceList = resourcesMapper.listResourceByIds(childResIdArray);
childResourceList = resourceList.stream().map(t -> {
t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName));
t.setUpdateTime(now);
return t;
}).collect(Collectors.toList());
resourcesMapper.batchUpdateResource(childResourceList);
if (ResourceType.UDF.equals(resource.getType())) {
List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(childResIdArray);
if (CollectionUtils.isNotEmpty(udfFuncs)) {
udfFuncs = udfFuncs.stream().map(t -> {
t.setResourceName(t.getResourceName().replaceFirst(originFullName, matcherFullName));
t.setUpdateTime(now);
return t;
}).collect(Collectors.toList());
udfFunctionMapper.batchUpdateUdfFunc(udfFuncs);
}
}
}
} else if (ResourceType.UDF.equals(resource.getType())) {
List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(new Integer[]{resourceId});
if (CollectionUtils.isNotEmpty(udfFuncs)) {
udfFuncs = udfFuncs.stream().map(t -> {
t.setResourceName(fullName);
t.setUpdateTime(now);
return t;
}).collect(Collectors.toList());
udfFunctionMapper.batchUpdateUdfFunc(udfFuncs);
}
}
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
Map<String, Object> resultMap = new HashMap<>();
Map<String, Object> resultMap = new HashMap<>(5);
for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
@ -415,11 +487,31 @@ public class ResourcesService extends BaseService {
logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e);
throw new ServiceException(Status.UPDATE_RESOURCE_ERROR);
}
// if name unchanged, return directly without moving on HDFS
if (originResourceName.equals(name)) {
if (originResourceName.equals(name) && file == null) {
return result;
}
if (file != null) {
// fail upload
if (!upload(loginUser, fullName, file, type)) {
logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
putMsg(result, Status.HDFS_OPERATION_ERROR);
throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
}
if (!fullName.equals(originFullName)) {
try {
HadoopUtils.getInstance().delete(originHdfsFileName,false);
} catch (IOException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(String.format("delete resource: %s failed.", originFullName));
}
}
return result;
}
// get the path of dest file in hdfs
String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName);
@ -449,7 +541,7 @@ public class ResourcesService extends BaseService {
*/
public Map<String, Object> queryResourceListPaging(User loginUser, int direcotryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
HashMap<String, Object> result = new HashMap<>();
HashMap<String, Object> result = new HashMap<>(5);
Page<Resource> page = new Page(pageNo, pageSize);
int userId = loginUser.getId();
if (isAdmin(loginUser)) {
@ -550,7 +642,7 @@ public class ResourcesService extends BaseService {
*/
public Map<String, Object> queryResourceList(User loginUser, ResourceType type) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
int userId = loginUser.getId();
if(isAdmin(loginUser)){
@ -565,21 +657,33 @@ public class ResourcesService extends BaseService {
}
/**
* query resource list
* query resource list by program type
*
* @param loginUser login user
* @param type resource type
* @return resource list
*/
public Map<String, Object> queryResourceJarList(User loginUser, ResourceType type) {
public Map<String, Object> queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
String suffix = ".jar";
int userId = loginUser.getId();
if(isAdmin(loginUser)){
userId = 0;
}
if (programType != null) {
switch (programType) {
case JAVA:
break;
case SCALA:
break;
case PYTHON:
suffix = ".py";
break;
}
}
List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
List<Resource> resources = new ResourceFilter(".jar",new ArrayList<>(allResourceList)).filter();
List<Resource> resources = new ResourceFilter(suffix,new ArrayList<>(allResourceList)).filter();
Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources);
result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
putMsg(result,Status.SUCCESS);
@ -829,7 +933,7 @@ public class ResourcesService extends BaseService {
* @param content content
* @return create result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@Transactional(rollbackFor = Exception.class)
public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory) {
Result result = new Result();
// if resource upload startup
@ -852,12 +956,25 @@ public class ResourcesService extends BaseService {
}
String name = fileName.trim() + "." + nameSuffix;
String fullName = "/".equals(currentDirectory) ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
result = verifyResourceName(fullName,type,loginUser);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
if (pid != -1) {
Resource parentResource = resourcesMapper.selectById(pid);
if (parentResource == null) {
putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
return result;
}
if (!hasPerm(loginUser, parentResource.getUserId())) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
}
// save data
Date now = new Date();
@ -891,7 +1008,7 @@ public class ResourcesService extends BaseService {
* @param content content
* @return update result cod
*/
@Transactional(rollbackFor = RuntimeException.class)
@Transactional(rollbackFor = Exception.class)
public Result updateResourceContent(int resourceId, String content) {
Result result = new Result();
@ -1096,7 +1213,7 @@ public class ResourcesService extends BaseService {
* @return unauthorized result code
*/
public Map<String, Object> unauthorizedUDFFunction(User loginUser, Integer userId) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
if (checkAdmin(loginUser, result)) {
return result;
@ -1148,7 +1265,7 @@ public class ResourcesService extends BaseService {
* @return authorized result
*/
public Map<String, Object> authorizedFile(User loginUser, Integer userId) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
if (checkAdmin(loginUser, result)){
return result;
}

20
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java

@ -148,7 +148,7 @@ public class UdfFuncService extends BaseService{
*/
public Map<String, Object> queryUdfFuncDetail(int id) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
UdfFunc udfFunc = udfFuncMapper.selectById(id);
if (udfFunc == null) {
putMsg(result, Status.RESOURCE_NOT_EXIST);
@ -244,7 +244,7 @@ public class UdfFuncService extends BaseService{
* @return udf function list page
*/
public Map<String, Object> queryUdfFuncListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
@ -276,15 +276,19 @@ public class UdfFuncService extends BaseService{
}
/**
* query data resource by type
* query udf list
*
* @param loginUser login user
* @param type resource type
* @return resource list
* @param type udf type
* @return udf func list
*/
public Map<String, Object> queryResourceList(User loginUser, Integer type) {
Map<String, Object> result = new HashMap<>();
List<UdfFunc> udfFuncList = udfFuncMapper.getUdfFuncByType(loginUser.getId(), type);
public Map<String, Object> queryUdfFuncList(User loginUser, Integer type) {
Map<String, Object> result = new HashMap<>(5);
int userId = loginUser.getId();
if (isAdmin(loginUser)) {
userId = 0;
}
List<UdfFunc> udfFuncList = udfFuncMapper.getUdfFuncByType(userId, type);
result.put(Constants.DATA_LIST, udfFuncList);
putMsg(result, Status.SUCCESS);

51
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java

@ -103,7 +103,7 @@ public class UsersService extends BaseService {
String queue,
int state) throws Exception {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
//check all user params
String msg = this.checkUserParams(userName, userPassword, email, phone);
@ -231,7 +231,7 @@ public class UsersService extends BaseService {
* @return user list page
*/
public Map<String, Object> queryUserList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@ -253,6 +253,8 @@ public class UsersService extends BaseService {
/**
* updateProcessInstance user
*
*
* @param loginUser
* @param userId user id
* @param userName user name
* @param userPassword user password
@ -263,7 +265,7 @@ public class UsersService extends BaseService {
* @return update result code
* @throws Exception exception
*/
public Map<String, Object> updateUser(int userId,
public Map<String, Object> updateUser(User loginUser, int userId,
String userName,
String userPassword,
String email,
@ -271,16 +273,17 @@ public class UsersService extends BaseService {
String phone,
String queue,
int state) throws Exception {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
result.put(Constants.STATUS, false);
if (check(result, !hasPerm(loginUser, userId), Status.USER_NO_OPERATION_PERM)) {
return result;
}
User user = userMapper.selectById(userId);
if (user == null) {
putMsg(result, Status.USER_NOT_EXIST, userId);
return result;
}
if (StringUtils.isNotEmpty(userName)) {
if (!CheckUtils.checkUserName(userName)){
@ -394,7 +397,7 @@ public class UsersService extends BaseService {
* @throws Exception exception when operate hdfs
*/
public Map<String, Object> deleteUserById(User loginUser, int id) throws Exception {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
if (!isAdmin(loginUser)) {
putMsg(result, Status.USER_NO_OPERATION_PERM, id);
@ -434,7 +437,7 @@ public class UsersService extends BaseService {
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantProject(User loginUser, int userId, String projectIds) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
result.put(Constants.STATUS, false);
//only admin can operate
@ -484,7 +487,7 @@ public class UsersService extends BaseService {
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantResources(User loginUser, int userId, String resourceIds) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@ -581,7 +584,7 @@ public class UsersService extends BaseService {
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantUDFFunction(User loginUser, int userId, String udfIds) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
@ -628,7 +631,7 @@ public class UsersService extends BaseService {
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantDataSource(User loginUser, int userId, String datasourceIds) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
result.put(Constants.STATUS, false);
//only admin can operate
@ -708,7 +711,7 @@ public class UsersService extends BaseService {
* @return user list
*/
public Map<String, Object> queryAllGeneralUsers(User loginUser) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@ -729,7 +732,7 @@ public class UsersService extends BaseService {
* @return user list
*/
public Map<String, Object> queryUserList(User loginUser) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@ -773,7 +776,7 @@ public class UsersService extends BaseService {
*/
public Map<String, Object> unauthorizedUser(User loginUser, Integer alertgroupId) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@ -809,7 +812,7 @@ public class UsersService extends BaseService {
* @return authorized result code
*/
public Map<String, Object> authorizedUser(User loginUser, Integer alertgroupId) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@ -821,24 +824,6 @@ public class UsersService extends BaseService {
return result;
}
/**
* check
*
* @param result result
* @param bool bool
* @param userNoOperationPerm status
* @return check result
*/
private boolean check(Map<String, Object> result, boolean bool, Status userNoOperationPerm) {
//only admin can operate
if (bool) {
result.put(Constants.STATUS, userNoOperationPerm);
result.put(Constants.MSG, userNoOperationPerm.getMsg());
return true;
}
return false;
}
/**
* @param tenantId tenant id
* @return true if tenant exists, otherwise return false

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -881,7 +881,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
processMeta.getProcessDefinitionLocations(),
processMeta.getProcessDefinitionConnects());
putMsg(result, Status.SUCCESS);
} catch (JsonProcessingException e) {
} catch (Exception e) {
logger.error("import process meta json data: {}", e.getMessage(), e);
putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
}

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java

@ -308,6 +308,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
} else {
putMsg(result, Status.TENANT_NOT_EXIST);
}
return result;
}
/**

16
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AccessTokenServiceTest.java

@ -81,7 +81,7 @@ public class AccessTokenServiceTest {
public void testCreateToken() {
when(accessTokenMapper.insert(any(AccessToken.class))).thenReturn(2);
Map<String, Object> result = accessTokenService.createToken(1, getDate(), "AccessTokenServiceTest");
Map<String, Object> result = accessTokenService.createToken(getLoginUser(), 1, getDate(), "AccessTokenServiceTest");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@ -89,7 +89,7 @@ public class AccessTokenServiceTest {
@Test
public void testGenerateToken() {
Map<String, Object> result = accessTokenService.generateToken(Integer.MAX_VALUE, getDate());
Map<String, Object> result = accessTokenService.generateToken(getLoginUser(), Integer.MAX_VALUE,getDate());
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
String token = (String) result.get(Constants.DATA_LIST);
@ -121,16 +121,24 @@ public class AccessTokenServiceTest {
public void testUpdateToken() {
when(accessTokenMapper.selectById(1)).thenReturn(getEntity());
Map<String, Object> result = accessTokenService.updateToken(1, Integer.MAX_VALUE, getDate(), "token");
Map<String, Object> result = accessTokenService.updateToken(getLoginUser(), 1,Integer.MAX_VALUE,getDate(),"token");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
// not exist
result = accessTokenService.updateToken(2, Integer.MAX_VALUE, getDate(), "token");
result = accessTokenService.updateToken(getLoginUser(), 2,Integer.MAX_VALUE,getDate(),"token");
logger.info(result.toString());
Assert.assertEquals(Status.ACCESS_TOKEN_NOT_EXIST, result.get(Constants.STATUS));
}
private User getLoginUser(){
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
return loginUser;
}
/**
* create entity
*/

22
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -139,6 +139,10 @@ public class ResourcesServiceTest {
Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
//PARENT_RESOURCE_NOT_EXIST
user.setId(1);
user.setTenantId(1);
Mockito.when(userMapper.selectById(1)).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
Mockito.when(resourcesMapper.selectById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,1,"/");
@ -159,19 +163,19 @@ public class ResourcesServiceTest {
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
User user = new User();
//HDFS_NOT_STARTUP
Result result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE);
Result result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
//RESOURCE_NOT_EXIST
Mockito.when(resourcesMapper.selectById(1)).thenReturn(getResource());
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
result = resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE);
result = resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
//USER_NO_OPERATION_PERM
result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(),result.getMsg());
@ -186,7 +190,7 @@ public class ResourcesServiceTest {
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
result = resourcesService.updateResource(user, 1, "ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF);
result = resourcesService.updateResource(user, 1, "ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF,null);
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
//SUCCESS
@ -199,25 +203,25 @@ public class ResourcesServiceTest {
logger.error(e.getMessage(),e);
}
result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
//RESOURCE_EXIST
Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList());
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
//USER_NOT_EXIST
Mockito.when(userMapper.selectById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null);
logger.info(result.toString());
Assert.assertTrue(Status.USER_NOT_EXIST.getCode() == result.getCode());
//TENANT_NOT_EXIST
Mockito.when(userMapper.selectById(1)).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(),result.getMsg());
@ -231,7 +235,7 @@ public class ResourcesServiceTest {
logger.error(e.getMessage(),e);
}
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF,null);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java

@ -225,13 +225,13 @@ public class UsersServiceTest {
String userPassword = "userTest0001";
try {
//user not exist
Map<String, Object> result = usersService.updateUser(0,userName,userPassword,"3443@qq.com",1,"13457864543","queue", 1);
Map<String, Object> result = usersService.updateUser(getLoginUser(), 0,userName,userPassword,"3443@qq.com",1,"13457864543","queue", 1);
Assert.assertEquals(Status.USER_NOT_EXIST, result.get(Constants.STATUS));
logger.info(result.toString());
//success
when(userMapper.selectById(1)).thenReturn(getUser());
result = usersService.updateUser(1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue", 1);
result = usersService.updateUser(getLoginUser(), 1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue", 1);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
} catch (Exception e) {
@ -357,6 +357,12 @@ public class UsersServiceTest {
}
private User getLoginUser(){
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
return loginUser;
}
@Test
public void getUserInfo(){

2
dolphinscheduler-common/pom.xml

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-common</artifactId>
<name>dolphinscheduler-common</name>

2
dolphinscheduler-dao/pom.xml

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-dao</artifactId>
<name>${project.artifactId}</name>

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java

@ -100,5 +100,12 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
*/
List<UdfFunc> listAuthorizedUdfByResourceId(@Param("userId") int userId,@Param("resourceIds") int[] resourceIds);
/**
* batch update udf func
* @param udfFuncList udf list
* @return update num
*/
int batchUpdateUdfFunc(@Param("udfFuncList") List<UdfFunc> udfFuncList);
}

177
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -165,9 +166,6 @@ public class DagHelper {
resultList.add(startNode);
}
if (CollectionUtils.isEmpty(depList)) {
if (null != startNode) {
visitedNodeNameList.add(startNode.getName());
}
return resultList;
}
for (String depNodeName : depList) {
@ -252,71 +250,86 @@ public class DagHelper {
return null;
}
/**
* get start vertex in one dag
* it would find the post node if the start vertex is forbidden running
* @param parentNodeName previous node
* the task can be submit when all the depends nodes are forbidden or complete
* @param taskNode taskNode
* @param dag dag
* @param completeTaskList completeTaskList
* @return start Vertex list
* @return can submit
*/
public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList){
if(completeTaskList == null){
completeTaskList = new HashMap<>();
public static boolean allDependsForbiddenOrEnd(TaskNode taskNode,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskNode> skipTaskNodeList,
Map<String, TaskInstance> completeTaskList) {
List<String> dependList = taskNode.getDepList();
if (dependList == null) {
return true;
}
Collection<String> startVertexs = null;
if(StringUtils.isNotEmpty(parentNodeName)){
startVertexs = dag.getSubsequentNodes(parentNodeName);
}else{
startVertexs = dag.getBeginNode();
for (String dependNodeName : dependList) {
TaskNode dependNode = dag.getNode(dependNodeName);
if (completeTaskList.containsKey(dependNodeName)
|| dependNode.isForbidden()
|| skipTaskNodeList.containsKey(dependNodeName)) {
continue;
} else {
return false;
}
}
return true;
}
List<String> tmpStartVertexs = new ArrayList<>();
if(startVertexs!= null){
tmpStartVertexs.addAll(startVertexs);
/**
* parse the successor nodes of previous node.
* this function parse the condition node to find the right branch.
* also check all the depends nodes forbidden or complete
* @param preNodeName
* @return successor nodes
*/
public static Set<String> parsePostNodes(String preNodeName,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
Set<String> postNodeList = new HashSet<>();
Collection<String> startVertexes = new ArrayList<>();
if (preNodeName == null) {
startVertexes = dag.getBeginNode();
} else if (dag.getNode(preNodeName).isConditionsTask()) {
List<String> conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
} else {
startVertexes = dag.getSubsequentNodes(preNodeName);
}
for(String start : startVertexs){
TaskNode startNode = dag.getNode(start);
if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){
// the start can be submit if not forbidden and not in complete tasks
for (String subsequent : startVertexes) {
TaskNode taskNode = dag.getNode(subsequent);
if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) {
setTaskNodeSkip(subsequent, dag, completeTaskList, skipTaskNodeList );
continue;
}
// then submit the post nodes
Collection<String> postNodes = getStartVertex(start, dag, completeTaskList);
for(String post : postNodes){
TaskNode postNode = dag.getNode(post);
if(taskNodeCanSubmit(postNode, dag, completeTaskList)){
tmpStartVertexs.add(post);
}
if (!DagHelper.allDependsForbiddenOrEnd(taskNode, dag, skipTaskNodeList, completeTaskList)) {
continue;
}
tmpStartVertexs.remove(start);
if (taskNode.isForbidden() || completeTaskList.containsKey(subsequent)) {
postNodeList.addAll(parsePostNodes(subsequent, skipTaskNodeList, dag, completeTaskList));
continue;
}
postNodeList.add(subsequent);
}
return tmpStartVertexs;
return postNodeList;
}
/**
* the task can be submit when all the depends nodes are forbidden or complete
* @param taskNode taskNode
* @param dag dag
* @param completeTaskList completeTaskList
* @return can submit
* if all of the task dependence are skipped, skip it too.
* @param taskNode
* @return
*/
public static boolean taskNodeCanSubmit(TaskNode taskNode,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
List<String> dependList = taskNode.getDepList();
if(dependList == null){
return true;
private static boolean isTaskNodeNeedSkip(TaskNode taskNode,
Map<String, TaskNode> skipTaskNodeList
){
if(CollectionUtils.isEmpty(taskNode.getDepList())){
return false;
}
for(String dependNodeName : dependList){
TaskNode dependNode = dag.getNode(dependNodeName);
if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){
for(String depNode : taskNode.getDepList()){
if(!skipTaskNodeList.containsKey(depNode)){
return false;
}
}
@ -324,6 +337,66 @@ public class DagHelper {
}
/**
* parse condition task find the branch process
* set skip flag for another one.
* @param nodeName
* @return
*/
public static List<String> parseConditionTask(String nodeName,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList){
List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName);
if (!taskNode.isConditionsTask()){
return conditionTaskList;
}
if (!completeTaskList.containsKey(nodeName)){
return conditionTaskList;
}
TaskInstance taskInstance = completeTaskList.get(nodeName);
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
List<String> skipNodeList = new ArrayList<>();
if(taskInstance.getState().typeIsSuccess()){
conditionTaskList = conditionsParameters.getSuccessNode();
skipNodeList = conditionsParameters.getFailedNode();
}else if(taskInstance.getState().typeIsFailure()){
conditionTaskList = conditionsParameters.getFailedNode();
skipNodeList = conditionsParameters.getSuccessNode();
}else{
conditionTaskList.add(nodeName);
}
for(String failedNode : skipNodeList){
setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList);
}
return conditionTaskList;
}
/**
* set task node and the post nodes skip flag
* @param skipNodeName
* @param dag
* @param completeTaskList
* @param skipTaskNodeList
*/
private static void setTaskNodeSkip(String skipNodeName,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList,
Map<String, TaskNode> skipTaskNodeList){
skipTaskNodeList.putIfAbsent(skipNodeName, dag.getNode(skipNodeName));
Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeName);
for(String post : postNodeList){
TaskNode postNode = dag.getNode(post);
if(isTaskNodeNeedSkip(postNode, skipTaskNodeList)){
setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
}
}
}
/***
* build dag graph
* @param processDag processDag
@ -383,7 +456,7 @@ public class DagHelper {
*/
public static boolean haveConditionsAfterNode(String parentNodeName,
DAG<String, TaskNode, TaskNodeRelation> dag
){
){
boolean result = false;
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeName);
if(CollectionUtils.isEmpty(subsequentNodes)){

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml

@ -111,4 +111,17 @@
</foreach>
</if>
</select>
<update id="batchUpdateUdfFunc" parameterType="java.util.List">
<foreach collection="udfFuncList" item="udf" index="index" open="" close="" separator =";">
update t_ds_udfs
<set>
resource_name=#{udf.resourceName},
update_time=#{udf.updateTime}
</set>
<where>
id=#{udf.id}
</where>
</foreach>
</update>
</mapper>

2
dolphinscheduler-dist/pom.xml vendored

@ -20,7 +20,7 @@
<parent>
<artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

2
dolphinscheduler-microbench/pom.xml

@ -21,7 +21,7 @@
<parent>
<artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

2
dolphinscheduler-plugin-api/pom.xml

@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-plugin-api</artifactId>
<name>${project.artifactId}</name>

2
dolphinscheduler-remote/pom.xml

@ -20,7 +20,7 @@
<parent>
<artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -78,6 +78,16 @@ public enum CommandType {
*/
TASK_EXECUTE_RESPONSE,
/**
* db task ack
*/
DB_TASK_ACK,
/**
* db task response
*/
DB_TASK_RESPONSE,
/**
* kill task
*/

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java

@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable;
@ -56,7 +56,7 @@ public class DBTaskAckCommand implements Serializable {
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.DB_TASK_ACK);
byte[] body = FastJsonSerializer.serialize(this);
byte[] body = JsonSerializer.serialize(this);
command.setBody(body);
return command;
}

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java

@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable;
@ -56,7 +56,7 @@ public class DBTaskResponseCommand implements Serializable {
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.DB_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
byte[] body = JsonSerializer.serialize(this);
command.setBody(body);
return command;
}

2
dolphinscheduler-server/pom.xml

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-server</artifactId>
<name>dolphinscheduler-server</name>

159
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
/**
* dependent parameters
*/
private DependentParameters dependentParameters;
/**
* complete task map
*/
private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
/**
* condition result
*/
private DependResult conditionResult;
/**
* constructor of MasterBaseTaskExecThread
*
* @param taskInstance task instance
*/
public ConditionsTaskExecThread(TaskInstance taskInstance) {
super(taskInstance);
taskInstance.setStartTime(new Date());
}
@Override
public Boolean submitWaitComplete() {
try{
this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
logger.info("dependent task start");
waitTaskQuit();
updateTaskState();
}catch (Exception e){
logger.error("conditions task run exception" , e);
}
return true;
}
private void waitTaskQuit() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
taskInstance.getProcessInstanceId()
);
for(TaskInstance task : taskInstances){
completeTaskList.putIfAbsent(task.getName(), task.getState());
}
List<DependResult> modelResultList = new ArrayList<>();
for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){
List<DependResult> itemDependResult = new ArrayList<>();
for(DependentItem item : dependentTaskModel.getDependItemList()){
itemDependResult.add(getDependResultForItem(item));
}
DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);
modelResultList.add(modelResult);
}
conditionResult = DependentUtils.getDependResultForRelation(
dependentParameters.getRelation(), modelResultList
);
logger.info("the conditions task depend result : {}", conditionResult);
}
/**
*
*/
private void updateTaskState() {
ExecutionStatus status;
if(this.cancel){
status = ExecutionStatus.KILL;
}else{
status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
}
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
}
private void initTaskParameters() {
this.taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance));
this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class);
}
/**
* depend result for depend item
* @param item
* @return
*/
private DependResult getDependResultForItem(DependentItem item){
DependResult dependResult = DependResult.SUCCESS;
if(!completeTaskList.containsKey(item.getDepTasks())){
logger.info("depend item: {} have not completed yet.", item.getDepTasks());
dependResult = DependResult.FAILED;
return dependResult;
}
ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks());
if(executionStatus != item.getStatus()){
logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus(), executionStatus);
dependResult = DependResult.FAILED;
}
logger.info("dependent item complete {} {},{}",
Constants.DEPENDENT_SPLIT, item.getDepTasks(), dependResult);
return dependResult;
}
}

103
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -14,14 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@ -34,6 +37,11 @@ import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
import java.util.Date;
import java.util.concurrent.Callable;
/**
* master task exec base class
@ -81,10 +89,19 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
*/
private TaskPriorityQueue taskUpdateQueue;
/**
* whether need check task time out.
*/
protected boolean checkTimeoutFlag = false;
/**
* task timeout parameters
*/
protected TaskTimeoutParameter taskTimeoutParameter;
/**
* constructor of MasterBaseTaskExecThread
*
* @param taskInstance task instance
* @param taskInstance task instance
*/
public MasterBaseTaskExecThread(TaskInstance taskInstance) {
this.processService = SpringApplicationContext.getBean(ProcessService.class);
@ -93,6 +110,27 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
initTaskParams();
}
/**
* init task ordinary parameters
*/
private void initTaskParams() {
initTimeoutParams();
}
/**
* init task timeout parameters
*/
private void initTimeoutParams() {
String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
if(taskTimeoutParameter.getEnable()){
checkTimeoutFlag = true;
}
}
/**
@ -113,7 +151,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* submit master base task exec thread
*
* @return TaskInstance
*/
protected TaskInstance submit() {
@ -154,14 +191,13 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
return task;
}
/**
* dispatcht task
*
* @param taskInstance taskInstance
* @return whether submit task success
*/
public Boolean dispatchTask(TaskInstance taskInstance) {
try{
if(taskInstance.isConditionsTask()
|| taskInstance.isDependTask()
@ -198,7 +234,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
}
/**
* buildTaskPriorityInfo
*
@ -227,7 +262,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* submit wait complete
*
* @return true
*/
protected Boolean submitWaitComplete() {
@ -236,7 +270,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* call
*
* @return boolean
* @throws Exception exception
*/
@ -246,4 +279,56 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
return submitWaitComplete();
}
/**
* alert time out
* @return
*/
protected boolean alertTimeout(){
if( TaskTimeoutStrategy.FAILED == this.taskTimeoutParameter.getStrategy()){
return true;
}
logger.warn("process id:{} process name:{} task id: {},name:{} execution time out",
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
// send warn mail
ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(),processDefine.getReceivers(),
processDefine.getReceiversCc(), processInstance.getId(), processInstance.getName(),
taskInstance.getId(),taskInstance.getName());
return true;
}
/**
* handle time out for time out strategy warn&&failed
*/
protected void handleTimeoutFailed(){
if(TaskTimeoutStrategy.WARN == this.taskTimeoutParameter.getStrategy()){
return;
}
logger.info("process id:{} name:{} task id:{} name:{} cancel because of timeout.",
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
this.cancel = true;
}
/**
* check task remain time valid
* @return
*/
protected boolean checkTaskTimeout(){
if (!checkTimeoutFlag || taskInstance.getStartTime() == null){
return false;
}
long remainTime = getRemainTime(taskTimeoutParameter.getInterval() * 60L);
return remainTime <= 0;
}
/**
* get remain time
*
* @return remain time
*/
protected long getRemainTime(long timeoutSeconds) {
Date startTime = taskInstance.getStartTime();
long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000;
return timeoutSeconds - usedTime;
}
}

191
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -68,6 +68,7 @@ import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -348,11 +349,12 @@ public class MasterExecThread implements Runnable {
* @throws Exception exception
*/
private void prepareProcess() throws Exception {
// init task queue
initTaskQueue();
// gen process dag
buildFlowDag();
// init task queue
initTaskQueue();
logger.info("prepare process :{} end", processInstance.getId());
}
@ -407,6 +409,9 @@ public class MasterExecThread implements Runnable {
if(task.isTaskComplete()){
completeTaskList.put(task.getName(), task);
}
if(task.isConditionsTask() || DagHelper.haveConditionsAfterNode(task.getName(), dag)){
continue;
}
if(task.getState().typeIsFailure() && !task.taskCanRetry()){
errorTaskList.put(task.getName(), task);
}
@ -498,6 +503,9 @@ public class MasterExecThread implements Runnable {
// task instance whether alert
taskInstance.setAlertFlag(Flag.NO);
// task instance start time
taskInstance.setStartTime(null);
// task instance flag
taskInstance.setFlag(Flag.YES);
@ -532,132 +540,13 @@ public class MasterExecThread implements Runnable {
return taskInstance;
}
/**
* if all of the task dependence are skip, skip it too.
* @param taskNode
* @return
*/
private boolean isTaskNodeNeedSkip(TaskNode taskNode){
if(CollectionUtils.isEmpty(taskNode.getDepList())){
return false;
}
for(String depNode : taskNode.getDepList()){
if(!skipTaskNodeList.containsKey(depNode)){
return false;
}
}
return true;
}
/**
* set task node skip if dependence all skip
* @param taskNodesSkipList
*/
private void setTaskNodeSkip(List<String> taskNodesSkipList){
for(String skipNode : taskNodesSkipList){
skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode));
Collection<String> postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList);
List<String> postSkipList = new ArrayList<>();
for(String post : postNodeList){
TaskNode postNode = dag.getNode(post);
if(isTaskNodeNeedSkip(postNode)){
postSkipList.add(post);
}
}
setTaskNodeSkip(postSkipList);
}
}
/**
* parse condition task find the branch process
* set skip flag for another one.
* @param nodeName
* @return
*/
private List<String> parseConditionTask(String nodeName){
List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName);
if(!taskNode.isConditionsTask()){
return conditionTaskList;
}
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
TaskInstance taskInstance = completeTaskList.get(nodeName);
if(taskInstance == null){
logger.error("task instance {} cannot find, please check it!", nodeName);
return conditionTaskList;
}
if(taskInstance.getState().typeIsSuccess()){
conditionTaskList = conditionsParameters.getSuccessNode();
setTaskNodeSkip(conditionsParameters.getFailedNode());
}else if(taskInstance.getState().typeIsFailure()){
conditionTaskList = conditionsParameters.getFailedNode();
setTaskNodeSkip(conditionsParameters.getSuccessNode());
}else{
conditionTaskList.add(nodeName);
}
return conditionTaskList;
}
/**
* parse post node list of previous node
* if condition node: return process according to the settings
* if post node completed, return post nodes of the completed node
* @param previousNodeName
* @return
*/
private List<String> parsePostNodeList(String previousNodeName){
List<String> postNodeList = new ArrayList<>();
TaskNode taskNode = dag.getNode(previousNodeName);
if(taskNode != null && taskNode.isConditionsTask()){
return parseConditionTask(previousNodeName);
}
Collection<String> postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList);
List<String> postSkipList = new ArrayList<>();
// delete success node, parse the past nodes
// if conditions node,
// 1. parse the branch process according the conditions setting
// 2. set skip flag on anther branch process
for(String postNode : postNodeCollection){
if(completeTaskList.containsKey(postNode)){
TaskInstance postTaskInstance = completeTaskList.get(postNode);
if(dag.getNode(postNode).isConditionsTask()){
List<String> conditionTaskNodeList = parseConditionTask(postNode);
for(String conditions : conditionTaskNodeList){
postNodeList.addAll(parsePostNodeList(conditions));
}
}else if(postTaskInstance.getState().typeIsSuccess()){
postNodeList.addAll(parsePostNodeList(postNode));
}else{
postNodeList.add(postNode);
}
}else if(isTaskNodeNeedSkip(dag.getNode(postNode))){
postSkipList.add(postNode);
setTaskNodeSkip(postSkipList);
postSkipList.clear();
}else{
postNodeList.add(postNode);
}
}
return postNodeList;
}
/**
* submit post node
* @param parentNodeName parent node name
*/
private Map<String,Object> propToValue = new ConcurrentHashMap<String, Object>();
private void submitPostNode(String parentNodeName){
List<String> submitTaskNodeList = parsePostNodeList(parentNodeName);
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
for(String taskNode : submitTaskNodeList){
try {
@ -702,7 +591,6 @@ public class MasterExecThread implements Runnable {
if(startNodes.contains(taskName)){
return DependResult.SUCCESS;
}
TaskNode taskNode = dag.getNode(taskName);
List<String> depNameList = taskNode.getDepList();
for(String depsNode : depNameList ){
@ -716,23 +604,42 @@ public class MasterExecThread implements Runnable {
return DependResult.WAITING;
}
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
// conditions task would not return failed.
if(depTaskState.typeIsFailure()
&& !DagHelper.haveConditionsAfterNode(depsNode, dag )
&& !dag.getNode(depsNode).isConditionsTask()){
return DependResult.FAILED;
}
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){
return DependResult.WAITING;
}
// ignore task state if current task is condition
if(taskNode.isConditionsTask()){
continue;
}
if(!dependTaskSuccess(depsNode, taskName)){
return DependResult.FAILED;
}
}
logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray()));
return DependResult.SUCCESS;
}
/**
* depend node is completed, but here need check the condition task branch is the next node
* @param dependNodeName
* @param nextNodeName
* @return
*/
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){
if(dag.getNode(dependNodeName).isConditionsTask()){
//condition task need check the branch to run
List<String> nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeList, dag, completeTaskList);
if(!nextTaskList.contains(nextNodeName)){
return false;
}
}else {
ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState();
if(depTaskState.typeIsFailure()){
return false;
}
}
return true;
}
/**
* query task instance by complete state
@ -889,6 +796,24 @@ public class MasterExecThread implements Runnable {
return state;
}
/**
* whether standby task list have retry tasks
* @return
*/
private boolean retryTaskExists() {
boolean result = false;
for(String taskName : readyToSubmitTaskList.keySet()){
TaskInstance task = readyToSubmitTaskList.get(taskName);
if(task.getState().typeIsFailure()){
result = true;
break;
}
}
return result;
}
/**
* whether complement end
* @return Boolean whether is complement end
@ -976,7 +901,7 @@ public class MasterExecThread implements Runnable {
// submit start node
submitPostNode(null);
boolean sendTimeWarning = false;
while(!processInstance.isProcessInstanceStop()){
while(!processInstance.isProcessInstanceStop() && Stopper.isRunning()){
// send warning email if process time out.
if(!sendTimeWarning && checkProcessTimeOut(processInstance) ){

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -202,6 +202,9 @@ public abstract class AbstractCommandExecutor {
return result;
}
public String getVarPool() {
return varPool.toString();
}
/**
* cancel application

2
dolphinscheduler-service/pom.xml

@ -20,7 +20,7 @@
<parent>
<artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

59
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -79,6 +79,8 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.commons.lang.ArrayUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@ -358,32 +360,39 @@ public class ProcessService {
* remove task log file
* @param processInstanceId processInstanceId
*/
public void removeTaskLogFile(Integer processInstanceId) {
public void removeTaskLogFile(Integer processInstanceId){
LogClientService logClient = new LogClientService();
LogClientService logClient = null;
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
try {
logClient = new LogClientService();
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
for (TaskInstance taskInstance : taskInstanceList) {
String taskLogPath = taskInstance.getLogPath();
if (StringUtils.isEmpty(taskInstance.getHost())) {
continue;
for (TaskInstance taskInstance : taskInstanceList) {
String taskLogPath = taskInstance.getLogPath();
if (StringUtils.isEmpty(taskInstance.getHost())) {
continue;
}
int port = Constants.RPC_PORT;
String ip = "";
try {
ip = Host.of(taskInstance.getHost()).getIp();
} catch (Exception e) {
// compatible old version
ip = taskInstance.getHost();
}
// remove task log from loggerserver
logClient.removeTaskLog(ip, port, taskLogPath);
}
int port = Constants.RPC_PORT;
String ip = "";
try {
ip = Host.of(taskInstance.getHost()).getIp();
} catch (Exception e) {
// compatible old version
ip = taskInstance.getHost();
}finally {
if (logClient != null) {
logClient.close();
}
// remove task log from loggerserver
logClient.removeTaskLog(ip,port,taskLogPath);
}
}
@ -457,6 +466,7 @@ public class ProcessService {
processInstance.getWarningType(),
processInstance.getWarningGroupId(),
processInstance.getScheduleTime(),
processInstance.getWorkerGroup(),
processInstance.getProcessInstancePriority()
);
saveCommand(command);
@ -1031,6 +1041,7 @@ public class ProcessService {
parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(),
parentProcessInstance.getScheduleTime(),
task.getWorkerGroup(),
parentProcessInstance.getProcessInstancePriority()
);
}
@ -1641,8 +1652,10 @@ public class ProcessService {
* @param resourceType resource type
* @return tenant code
*/
public String queryTenantCodeByResName(String resName,ResourceType resourceType) {
return resourceMapper.queryTenantCodeByResourceName(resName, resourceType.ordinal());
public String queryTenantCodeByResName(String resName,ResourceType resourceType){
// in order to query tenant code successful although the version is older
String fullName = resName.startsWith("/") ? resName : String.format("/%s",resName);
return resourceMapper.queryTenantCodeByResourceName(fullName, resourceType.ordinal());
}
/**
@ -1679,7 +1692,7 @@ public class ProcessService {
*/
public List<CycleDependency> getCycleDependencies(int masterId,int[] ids,Date scheduledFireTime) throws Exception {
List<CycleDependency> cycleDependencyList = new ArrayList<CycleDependency>();
if (ids == null || ids.length == 0) {
if(ArrayUtils.isEmpty(ids)){
logger.warn("ids[] is empty!is invalid!");
return cycleDependencyList;
}

2
dolphinscheduler-ui/pom.xml

@ -20,7 +20,7 @@
<parent>
<artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

2
pom.xml

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>1.3.2-SNAPSHOT</version>
<version>1.3.4-SNAPSHOT</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<url>http://dolphinscheduler.apache.org</url>

Loading…
Cancel
Save