Browse Source

Merge pull request #7 from apache/dev

update code
pull/2/head
BoYiZhang 5 years ago committed by GitHub
parent
commit
b28b4adfa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      docker/postgres/docker-entrypoint-initdb/init.sql
  2. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java
  3. 51
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
  4. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java
  5. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  6. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java
  7. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/security/PasswordAuthenticator.java
  8. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  9. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  10. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  11. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java
  12. 173
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
  13. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/LoginControllerTest.java
  14. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptorTest.java
  15. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/security/PasswordAuthenticatorTest.java
  16. 143
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  17. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  18. 18
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
  19. 116
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
  20. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  21. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
  22. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  23. 86
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java
  24. 38
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java
  25. 89
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java
  26. 18
      dolphinscheduler-common/src/main/resources/common.properties
  27. 20
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstanceMap.java
  28. 16
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Queue.java
  29. 20
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Session.java
  30. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Tenant.java
  31. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/User.java
  32. 47
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
  33. 54
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
  34. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/MysqlPerformance.java
  35. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UserMapper.xml
  36. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  37. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  38. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  39. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  40. 5
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
  41. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
  42. 11
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  43. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
  44. 10
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/details/index.vue
  45. 29
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue
  46. 8
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue
  47. 28
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/list.vue
  48. 9
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/index.vue
  49. 8
      dolphinscheduler-ui/src/js/conf/home/router/index.js
  50. 9
      dolphinscheduler-ui/src/js/module/components/secondaryMenu/_source/menu.js
  51. 6
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  52. 6
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  53. 2
      e2e/src/test/java/org/apache/dolphinscheduler/locator/security/UserManageLocator.java
  54. 1
      sql/dolphinscheduler_mysql.sql
  55. 2
      sql/soft_version
  56. 21
      sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
  57. 20
      sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

3
docker/postgres/docker-entrypoint-initdb/init.sql

@ -623,6 +623,7 @@ CREATE TABLE t_ds_user (
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
queue varchar(64) DEFAULT NULL ,
state int DEFAULT 1 ,
PRIMARY KEY (id)
);
@ -749,7 +750,7 @@ ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_
-- Records of t_ds_user?user : admin , password : dolphinscheduler123
INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,state,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', 1, '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
-- Records of t_ds_alertgroup,dolphinscheduler warning group
INSERT INTO t_ds_alertgroup(group_name,group_type,description,create_time,update_time) VALUES ('dolphinscheduler warning group', '0', 'dolphinscheduler warning group','2018-11-29 10:20:39', '2018-11-29 10:20:39');

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java

@ -72,7 +72,8 @@ public class UsersController extends BaseController {
@ApiImplicitParam(name = "tenantId", value = "TENANT_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "queue", value = "QUEUE", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "email", value = "EMAIL", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "phone", value = "PHONE", dataType = "Int", example = "100")
@ApiImplicitParam(name = "phone", value = "PHONE", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "state", value = "STATE", dataType = "Int", example = "1")
})
@PostMapping(value = "/create")
@ResponseStatus(HttpStatus.CREATED)
@ -83,11 +84,11 @@ public class UsersController extends BaseController {
@RequestParam(value = "tenantId") int tenantId,
@RequestParam(value = "queue", required = false, defaultValue = "") String queue,
@RequestParam(value = "email") String email,
@RequestParam(value = "phone", required = false) String phone) throws Exception {
logger.info("login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone, queue);
Map<String, Object> result = usersService.createUser(loginUser, userName, userPassword, email, tenantId, phone, queue);
@RequestParam(value = "phone", required = false) String phone,
@RequestParam(value = "state", required = false) int state) throws Exception {
logger.info("login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}, state: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone, queue, state);
Map<String, Object> result = usersService.createUser(loginUser, userName, userPassword, email, tenantId, phone, queue, state);
return returnDataList(result);
}
@ -146,7 +147,8 @@ public class UsersController extends BaseController {
@ApiImplicitParam(name = "tenantId", value = "TENANT_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "queue", value = "QUEUE", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "email", value = "EMAIL", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "phone", value = "PHONE", dataType = "Int", example = "100")
@ApiImplicitParam(name = "phone", value = "PHONE", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "state", value = "STATE", dataType = "Int", example = "1")
})
@PostMapping(value = "/update")
@ResponseStatus(HttpStatus.OK)
@ -158,10 +160,11 @@ public class UsersController extends BaseController {
@RequestParam(value = "queue", required = false, defaultValue = "") String queue,
@RequestParam(value = "email") String email,
@RequestParam(value = "tenantId") int tenantId,
@RequestParam(value = "phone", required = false) String phone) throws Exception {
logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone, queue);
Map<String, Object> result = usersService.updateUser(id, userName, userPassword, email, tenantId, phone, queue);
@RequestParam(value = "phone", required = false) String phone,
@RequestParam(value = "state", required = false) int state) throws Exception {
logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}, state: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone, queue, state);
Map<String, Object> result = usersService.updateUser(id, userName, userPassword, email, tenantId, phone, queue, state);
return returnDataList(result);
}

51
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java

@ -52,35 +52,7 @@ public class WorkerGroupController extends BaseController {
WorkerGroupService workerGroupService;
/**
* create or update a worker group
*
* @param loginUser login user
* @param id worker group id
* @param name worker group name
* @param ipList ip list
* @return create or update result code
*/
@ApiOperation(value = "saveWorkerGroup", notes = "CREATE_WORKER_GROUP_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"),
@ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "ipList", value = "WORKER_IP_LIST", required = true, dataType = "String")
})
@PostMapping(value = "/save")
@ResponseStatus(HttpStatus.OK)
@ApiException(SAVE_ERROR)
public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false, defaultValue = "0") int id,
@RequestParam(value = "name") String name,
@RequestParam(value = "ipList") String ipList
) {
logger.info("save worker group: login user {}, id:{}, name: {}, ipList: {} ",
loginUser.getUserName(), id, name, ipList);
Map<String, Object> result = workerGroupService.saveWorkerGroup(loginUser, id, name, ipList);
return returnDataList(result);
}
/**
* query worker groups paging
@ -132,28 +104,5 @@ public class WorkerGroupController extends BaseController {
return returnDataList(result);
}
/**
* delete worker group by id
*
* @param loginUser login user
* @param id group id
* @return delete result code
*/
@ApiOperation(value = "deleteById", notes = "DELETE_WORKER_GROUP_BY_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", required = true, dataType = "Int", example = "10"),
})
@GetMapping(value = "/delete-by-id")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_WORKER_GROUP_FAIL)
public Result deleteById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("id") Integer id
) {
logger.info("delete worker group: login user {}, id:{} ",
loginUser.getUserName(), id);
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(id);
return returnDataList(result);
}
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java

@ -50,6 +50,7 @@ public class ResourceTreeVisitor implements Visitor{
* visit
* @return resoruce component
*/
@Override
public ResourceComponent visit() {
ResourceComponent rootDirectory = new Directory();
for (Resource resource : resourceList) {
@ -117,6 +118,7 @@ public class ResourceTreeVisitor implements Visitor{
}else{
tempResourceComponent = new FileLeaf();
}
tempResourceComponent.setName(resource.getAlias());
tempResourceComponent.setFullName(resource.getFullName().replaceFirst("/",""));
tempResourceComponent.setId(resource.getId());

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -175,6 +175,7 @@ public enum Status {
QUERY_WORKER_GROUP_FAIL(10146,"query worker group fail ", "查询worker分组失败"),
DELETE_WORKER_GROUP_FAIL(10147,"delete worker group fail ", "删除worker分组失败"),
COPY_PROCESS_DEFINITION_ERROR(10148,"copy process definition error", "复制工作流错误"),
USER_DISABLED(10149,"The current user is disabled", "当前用户已停用"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java

@ -16,9 +16,11 @@
*/
package org.apache.dolphinscheduler.api.interceptor;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.security.Authenticator;
import org.apache.dolphinscheduler.api.service.SessionService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.commons.httpclient.HttpStatus;
@ -85,6 +87,14 @@ public class LoginHandlerInterceptor implements HandlerInterceptor {
return false;
}
}
// check user state
if (user.getState() == Flag.NO.ordinal()) {
response.setStatus(HttpStatus.SC_UNAUTHORIZED);
logger.info(Status.USER_DISABLED.getMsg());
return false;
}
request.setAttribute(Constants.SESSION_USER, user);
return true;
}

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/security/PasswordAuthenticator.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.service.SessionService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.Session;
import org.apache.dolphinscheduler.dao.entity.User;
import org.slf4j.Logger;
@ -49,6 +50,13 @@ public class PasswordAuthenticator implements Authenticator {
return result;
}
// check user state
if (user.getState() == Flag.NO.ordinal()) {
result.setCode(Status.USER_DISABLED.getCode());
result.setMsg(Status.USER_DISABLED.getMsg());
return result;
}
// create session
String sessionId = sessionService.createSession(user, extra);
if (sessionId == null) {

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -96,9 +96,6 @@ public class ProcessDefinitionService extends BaseDAGService {
@Autowired
private ProcessService processService;
@Autowired
private WorkerGroupMapper workerGroupMapper;
/**
* create process definition
*
@ -782,6 +779,16 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefinitionName, 1);
}
//unique check
Map<String, Object> checkResult = verifyProcessDefinitionName(loginUser, currentProjectName, processDefinitionName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (Status.SUCCESS.equals(status)) {
putMsg(result, Status.SUCCESS);
} else {
result.putAll(checkResult);
return false;
}
// get create process result
Map<String, Object> createProcessResult =
getCreateProcessResult(loginUser,

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -91,8 +91,7 @@ public class ProcessInstanceService extends BaseDAGService {
@Autowired
LoggerService loggerService;
@Autowired
WorkerGroupMapper workerGroupMapper;
@Autowired
UsersService usersService;

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -104,7 +104,7 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.HDFS_NOT_STARTUP);
return result;
}
String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
String fullName = "/".equals(currentDir) ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
if (pid != -1) {
Resource parentResource = resourcesMapper.selectById(pid);
@ -229,7 +229,7 @@ public class ResourcesService extends BaseService {
}
// check resoure name exists
String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
String fullName = "/".equals(currentDir) ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
if (checkResourceExists(fullName, 0, type.ordinal())) {
logger.error("resource {} has exist, can't recreate", name);
putMsg(result, Status.RESOURCE_EXIST);
@ -839,7 +839,7 @@ public class ResourcesService extends BaseService {
}
String name = fileName.trim() + "." + nameSuffix;
String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
String fullName = "/".equals(currentDirectory) ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
result = verifyResourceName(fullName,type,loginUser);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java

@ -94,7 +94,8 @@ public class UsersService extends BaseService {
String email,
int tenantId,
String phone,
String queue) throws Exception {
String queue,
int state) throws Exception {
Map<String, Object> result = new HashMap<>(5);
@ -115,7 +116,7 @@ public class UsersService extends BaseService {
return result;
}
User user = createUser(userName, userPassword, email, tenantId, phone, queue);
User user = createUser(userName, userPassword, email, tenantId, phone, queue, state);
Tenant tenant = tenantMapper.queryById(tenantId);
// resource upload startup
@ -139,7 +140,8 @@ public class UsersService extends BaseService {
String email,
int tenantId,
String phone,
String queue) throws Exception {
String queue,
int state) throws Exception {
User user = new User();
Date now = new Date();
@ -148,6 +150,7 @@ public class UsersService extends BaseService {
user.setEmail(email);
user.setTenantId(tenantId);
user.setPhone(phone);
user.setState(state);
// create general users, administrator users are currently built-in
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(now);
@ -260,7 +263,8 @@ public class UsersService extends BaseService {
String email,
int tenantId,
String phone,
String queue) throws Exception {
String queue,
int state) throws Exception {
Map<String, Object> result = new HashMap<>(5);
result.put(Constants.STATUS, false);
@ -309,7 +313,9 @@ public class UsersService extends BaseService {
}
user.setPhone(phone);
}
user.setQueue(queue);
user.setState(state);
Date now = new Date();
user.setUpdateTime(now);

173
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java

@ -16,24 +16,24 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.AccessToken;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.stream.Collectors;
/**
* work group service
@ -42,90 +42,13 @@ import java.util.*;
public class WorkerGroupService extends BaseService {
@Autowired
WorkerGroupMapper workerGroupMapper;
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator;
/**
* create or update a worker group
*
* @param loginUser login user
* @param id worker group id
* @param name worker group name
* @param ipList ip list
* @return create or update result code
*/
public Map<String, Object> saveWorkerGroup(User loginUser,int id, String name, String ipList){
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
if (checkAdmin(loginUser, result)){
return result;
}
if(StringUtils.isEmpty(name)){
putMsg(result, Status.NAME_NULL);
return result;
}
Date now = new Date();
WorkerGroup workerGroup = null;
if(id != 0){
workerGroup = workerGroupMapper.selectById(id);
//check exist
if (workerGroup == null){
workerGroup = new WorkerGroup();
workerGroup.setCreateTime(now);
}
}else{
workerGroup = new WorkerGroup();
workerGroup.setCreateTime(now);
}
workerGroup.setName(name);
workerGroup.setIpList(ipList);
workerGroup.setUpdateTime(now);
if(checkWorkerGroupNameExists(workerGroup)){
putMsg(result, Status.NAME_EXIST, workerGroup.getName());
return result;
}
if(workerGroup.getId() != 0 ){
workerGroupMapper.updateById(workerGroup);
}else{
workerGroupMapper.insert(workerGroup);
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* check worker group name exists
* @param workerGroup
* @return
*/
private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) {
List<WorkerGroup> workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName());
if(CollectionUtils.isNotEmpty(workerGroupList)){
// new group has same name..
if(workerGroup.getId() == 0){
return true;
}
// update group...
for(WorkerGroup group : workerGroupList){
if(group.getId() != workerGroup.getId()){
return true;
}
}
}
return false;
}
/**
* query worker group paging
@ -138,66 +61,100 @@ public class WorkerGroupService extends BaseService {
*/
public Map<String,Object> queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) {
// list from index
Integer fromIndex = (pageNo - 1) * pageSize;
// list to index
Integer toIndex = (pageNo - 1) * pageSize + pageSize;
Map<String, Object> result = new HashMap<>(5);
if (checkAdmin(loginUser, result)) {
return result;
}
Page<WorkerGroup> page = new Page(pageNo, pageSize);
IPage<WorkerGroup> workerGroupIPage = workerGroupMapper.queryListPaging(
page, searchVal);
List<WorkerGroup> workerGroups = getWorkerGroups(true);
List<WorkerGroup> resultDataList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(workerGroups)){
List<WorkerGroup> searchValDataList = new ArrayList<>();
if (StringUtils.isNotEmpty(searchVal)){
for (WorkerGroup workerGroup : workerGroups){
if (workerGroup.getName().contains(searchVal)){
searchValDataList.add(workerGroup);
}
}
}else {
searchValDataList = workerGroups;
}
if (searchValDataList.size() < pageSize){
toIndex = (pageNo - 1) * pageSize + searchValDataList.size();
}
resultDataList = searchValDataList.subList(fromIndex, toIndex);
}
PageInfo<WorkerGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotalCount((int)workerGroupIPage.getTotal());
pageInfo.setLists(workerGroupIPage.getRecords());
pageInfo.setTotalCount(resultDataList.size());
pageInfo.setLists(resultDataList);
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* delete worker group by id
* @param id worker group id
* @return delete result code
* query all worker group
*
* @return all worker group list
*/
@Transactional(rollbackFor = Exception.class)
public Map<String,Object> deleteWorkerGroupById(Integer id) {
public Map<String,Object> queryAllGroup() {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
List<WorkerGroup> workerGroups = getWorkerGroups(false);
List<ProcessInstance> processInstances = processInstanceMapper.queryByWorkerGroupIdAndStatus(id, Constants.NOT_TERMINATED_STATES);
if(CollectionUtils.isNotEmpty(processInstances)){
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
return result;
}
workerGroupMapper.deleteById(id);
processInstanceMapper.updateProcessInstanceByWorkerGroupId(id, Constants.DEFAULT_WORKER_ID);
Set<String> availableWorkerGroupSet = workerGroups.stream()
.map(workerGroup -> workerGroup.getName())
.collect(Collectors.toSet());
result.put(Constants.DATA_LIST, availableWorkerGroupSet);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query all worker group
* get worker groups
*
* @return all worker group list
* @param isPaging whether paging
* @return WorkerGroup list
*/
public Map<String,Object> queryAllGroup() {
Map<String, Object> result = new HashMap<>();
private List<WorkerGroup> getWorkerGroups(boolean isPaging) {
String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
// available workerGroup list
List<String> availableWorkerGroupList = new ArrayList<>();
List<WorkerGroup> workerGroups = new ArrayList<>();
for (String workerGroup : workerGroupList){
String workerGroupPath= workerPath + "/" + workerGroup;
List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
if (CollectionUtils.isNotEmpty(childrenNodes)){
availableWorkerGroupList.add(workerGroup);
WorkerGroup wg = new WorkerGroup();
wg.setName(workerGroup);
if (isPaging){
wg.setIpList(childrenNodes);
String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0));
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[3]));
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[4]));
}
workerGroups.add(wg);
}
result.put(Constants.DATA_LIST, availableWorkerGroupList);
putMsg(result, Status.SUCCESS);
return result;
}
return workerGroups;
}
}

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/LoginControllerTest.java

@ -56,7 +56,6 @@ public class LoginControllerTest extends AbstractControllerTest{
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test
public void testSignOut() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptorTest.java

@ -57,6 +57,7 @@ public class LoginHandlerInterceptorTest {
User mockUser = new User();
mockUser.setId(1);
mockUser.setUserType(UserType.GENERAL_USER);
mockUser.setState(1);
// test no token
when(authenticator.getAuthUser(request)).thenReturn(mockUser);
@ -67,5 +68,10 @@ public class LoginHandlerInterceptorTest {
when(request.getHeader("token")).thenReturn(token);
when(userMapper.queryUserByToken(token)).thenReturn(mockUser);
Assert.assertTrue(interceptor.preHandle(request, response, null));
// test disable user
mockUser.setState(0);
when(authenticator.getAuthUser(request)).thenReturn(mockUser);
Assert.assertFalse(interceptor.preHandle(request, response, null));
}
}

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/security/PasswordAuthenticatorTest.java

@ -67,6 +67,7 @@ public class PasswordAuthenticatorTest {
mockUser.setEmail("test@test.com");
mockUser.setUserPassword("test");
mockUser.setId(1);
mockUser.setState(1);
mockSession = new Session();
mockSession.setId(UUID.randomUUID().toString());
@ -82,6 +83,13 @@ public class PasswordAuthenticatorTest {
Result result = authenticator.authenticate("test", "test", "127.0.0.1");
Assert.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode());
logger.info(result.toString());
mockUser.setState(0);
when(usersService.queryUser("test", "test")).thenReturn(mockUser);
when(sessionService.createSession(mockUser, "127.0.0.1")).thenReturn(mockSession.getId());
Result result1 = authenticator.authenticate("test", "test", "127.0.0.1");
Assert.assertEquals(Status.USER_DISABLED.getCode(), (int) result1.getCode());
logger.info(result1.toString());
}
@Test

143
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -16,7 +16,6 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
@ -29,9 +28,7 @@ import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.apache.http.entity.ContentType;
import org.json.JSONException;
import org.junit.Assert;
@ -41,12 +38,8 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.quartz.Scheduler;
import org.skyscreamer.jsonassert.JSONAssert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.web.multipart.MultipartFile;
@ -59,7 +52,6 @@ import java.util.*;
@RunWith(MockitoJUnitRunner.Silent.class)
@SpringBootTest(classes = ApiApplicationServer.class)
public class ProcessDefinitionServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceTest.class);
@InjectMocks
ProcessDefinitionService processDefinitionService;
@ -79,8 +71,7 @@ public class ProcessDefinitionServiceTest {
@Mock
private ScheduleMapper scheduleMapper;
@Mock
private WorkerGroupMapper workerGroupMapper;
@Mock
private ProcessService processService;
@ -530,7 +521,6 @@ public class ProcessDefinitionServiceTest {
@Test
public void testExportProcessMetaDataStr() {
Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(getSchedulerList());
Mockito.when(workerGroupMapper.selectById(-1)).thenReturn(null);
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(sqlDependentJson);
@ -573,17 +563,14 @@ public class ProcessDefinitionServiceTest {
WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setName("ds-test-workergroup");
workerGroup.setId(2);
List<WorkerGroup> workerGroups = new ArrayList<>();
workerGroups.add(workerGroup);
Mockito.when(workerGroupMapper.queryWorkerGroupByName("ds-test")).thenReturn(workerGroups);
processMetaCron.setScheduleWorkerGroupName("ds-test");
int insertFlagWorker = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron,
processDefinitionName, processDefinitionId);
Assert.assertEquals(0, insertFlagWorker);
Mockito.when(workerGroupMapper.queryWorkerGroupByName("ds-test")).thenReturn(null);
int workerNullFlag = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron,
processDefinitionName, processDefinitionId);
Assert.assertEquals(0, workerNullFlag);
@ -659,7 +646,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "shell-4")).thenReturn(null);
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "testProject")).thenReturn(shellDefinition2);
processDefinitionService.importSubProcess(loginUser,testProject,jsonArray,subProcessIdMap);
processDefinitionService.importSubProcess(loginUser,testProject, jsonArray, subProcessIdMap);
String correctSubJson = jsonArray.toString();
@ -667,60 +654,32 @@ public class ProcessDefinitionServiceTest {
}
@Test
public void testCreateProcess() throws IOException{
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
String projectName = "test";
String name = "dag_test";
String description = "desc test";
String connects = "[]";
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.SUCCESS);
result.put("processDefinitionId",1);
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
Project project = getProject(projectName);
//project not found
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Mockito.when(processDefineMapper.insert(getProcessDefinition())).thenReturn(1);
Map<String, Object> result1 = processDefinitionService.createProcessDefinition(loginUser,projectName,name,json,description,locations,connects);
Assert.assertEquals(Status.SUCCESS,result1.get(Constants.STATUS));
}
@Test
public void testImportProcessDefinitionById() throws IOException {
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
String projectName = "test";
String name = "dag_test";
String description = "desc test";
String connects = "[]";
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.SUCCESS);
result.put("processDefinitionId",1);
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
Project project = getProject(projectName);
//project not found
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Mockito.when(processDefineMapper.insert(getProcessDefinition())).thenReturn(1);
Map<String, Object> result1 = processDefinitionService.createProcessDefinition(loginUser,projectName,name,json,description,locations,connects);
String processJson = "[{\"processDefinitionConnects\":\"[]\",\"processDefinitionJson\":\"{\\\"tenantId\\\":-1,\\\"globalParams\\\":[],\\\"tasks\\\":[{\\\"workerGroupId\\\":-1,\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"aa=\\\\\\\"1234\\\\\\\"\\\\necho ${aa}\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"},\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"ssh_test1\\\",\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-36196\\\",\\\"desc\\\":\\\"\\\"}],\\\"timeout\\\":0}\",\"processDefinitionLocations\":\"{\\\"tasks-36196\\\":{\\\"name\\\":\\\"ssh_test1\\\",\\\"targetarr\\\":\\\"\\\",\\\"x\\\":141,\\\"y\\\":70}}\",\"processDefinitionName\":\"dag_test\",\"projectName\":\"test\"}]";
String processJson = "[{\"projectName\":\"testProject\",\"processDefinitionName\":\"shell-4\"," +
"\"processDefinitionJson\":\"{\\\"tenantId\\\":1,\\\"globalParams\\\":[]," +
"\\\"tasks\\\":[{\\\"workerGroupId\\\":\\\"default\\\",\\\"description\\\":\\\"\\\",\\\"runFlag\\\":\\\"NORMAL\\\"," +
"\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"#!/bin/bash\\\\necho \\\\\\\"shell-4\\\\\\\"\\\"," +
"\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}," +
"\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-4\\\"," +
"\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-84090\\\"}," +
"{\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-5\\\",\\\"workerGroupId\\\":\\\"default\\\\," +
"\\\"description\\\":\\\"\\\",\\\"dependence\\\":{},\\\"preTasks\\\":[\\\"shell-4\\\"],\\\"id\\\":\\\"tasks-87364\\\"," +
"\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SUB_PROCESS\\\",\\\"params\\\":{\\\"processDefinitionId\\\":46}," +
"\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}}],\\\"timeout\\\":0}\"," +
"\"processDefinitionDescription\":\"\",\"processDefinitionLocations\":\"{\\\"tasks-84090\\\":{\\\"name\\\":\\\"shell-4\\\"," +
"\\\"targetarr\\\":\\\"\\\",\\\"x\\\":128,\\\"y\\\":114},\\\"tasks-87364\\\":{\\\"name\\\":\\\"shell-5\\\"," +
"\\\"targetarr\\\":\\\"tasks-84090\\\",\\\"x\\\":266,\\\"y\\\":115}}\"," +
"\"processDefinitionConnects\":\"[{\\\"endPointSourceId\\\":\\\"tasks-84090\\\"," +
"\\\"endPointTargetId\\\":\\\"tasks-87364\\\"}]\"}]";
String subProcessJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo \\\"shell-5\\\"\"},\"description\":\"\"," +
"\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":\\\"default\\\\," +
"\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
FileUtils.writeStringToFile(new File("/tmp/task.json"),processJson);
@ -731,37 +690,45 @@ public class ProcessDefinitionServiceTest {
MultipartFile multipartFile = new MockMultipartFile(file.getName(), file.getName(),
ContentType.APPLICATION_OCTET_STREAM.toString(), fileInputStream);
String currentProjectName = "test";
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
String currentProjectName = "testProject";
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.SUCCESS, currentProjectName);
ProcessDefinition shellDefinition2 = new ProcessDefinition();
shellDefinition2.setId(25);
shellDefinition2.setName("B");
shellDefinition2.setProjectId(1);
shellDefinition2.setId(46);
shellDefinition2.setName("shell-5");
shellDefinition2.setProjectId(2);
shellDefinition2.setProcessDefinitionJson(subProcessJson);
Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName));
Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineId(25)).thenReturn(shellDefinition2);
Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2);
//import process
Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName);
Assert.assertEquals(Status.SUCCESS, importProcessResult.get(Constants.STATUS));
boolean delete = file.delete();
Assert.assertTrue(delete);
String processMetaJson = "[]";
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
// Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName);
//
// Assert.assertEquals(Status.SUCCESS, importProcessResult.get(Constants.STATUS));
//
// boolean delete = file.delete();
//
processMetaJson = "[{\"scheduleWorkerGroupId\":-1}]";
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
// Assert.assertTrue(delete);
processMetaJson = "[{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\"}]";
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
// String processMetaJson = "";
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
//
// processMetaJson = "{\"scheduleWorkerGroupId\":-1}";
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
//
// processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\"}";
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
//
// processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\",\"processDefinitionName\":\"test_definition\"}";
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
processMetaJson = "[{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\",\"processDefinitionName\":\"test_definition\"}]";
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
}
@ -773,7 +740,7 @@ public class ProcessDefinitionServiceTest {
* @param processMetaJson process meta json
* @throws IOException IO exception
*/
private void importProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException {
private void improssProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException {
//check null
FileUtils.writeStringToFile(new File("/tmp/task.json"),processMetaJson);

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -80,8 +80,7 @@ public class ProcessInstanceServiceTest {
@Mock
LoggerService loggerService;
@Mock
WorkerGroupMapper workerGroupMapper;
@Mock
UsersService usersService;
@ -486,7 +485,6 @@ public class ProcessInstanceServiceTest {
*/
private WorkerGroup getWorkGroup() {
WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setId(1);
workerGroup.setName("test_workergroup");
return workerGroup;
}

18
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java

@ -98,41 +98,42 @@ public class UsersServiceTest {
String email = "123@qq.com";
int tenantId = Integer.MAX_VALUE;
String phone= "13456432345";
int state = 1;
try {
//userName error
Map<String, Object> result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName);
Map<String, Object> result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
userName = "userTest0001";
userPassword = "userTest000111111111111111";
//password error
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName);
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
userPassword = "userTest0001";
email = "1q.com";
//email error
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName);
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
email = "122222@qq.com";
phone ="2233";
//phone error
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName);
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
phone = "13456432345";
//tenantId not exists
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName);
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST, result.get(Constants.STATUS));
//success
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
result = usersService.createUser(user, userName, userPassword, email, 1, phone, queueName);
result = usersService.createUser(user, userName, userPassword, email, 1, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
@ -225,13 +226,13 @@ public class UsersServiceTest {
String userPassword = "userTest0001";
try {
//user not exist
Map<String, Object> result = usersService.updateUser(0,userName,userPassword,"3443@qq.com",1,"13457864543","queue");
Map<String, Object> result = usersService.updateUser(0,userName,userPassword,"3443@qq.com",1,"13457864543","queue", 1);
Assert.assertEquals(Status.USER_NOT_EXIST, result.get(Constants.STATUS));
logger.info(result.toString());
//success
when(userMapper.selectById(1)).thenReturn(getUser());
result = usersService.updateUser(1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue");
result = usersService.updateUser(1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue", 1);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
} catch (Exception e) {
@ -482,6 +483,7 @@ public class UsersServiceTest {
user.setUserType(UserType.ADMIN_USER);
user.setUserName("userTest0001");
user.setUserPassword("userTest0001");
user.setState(1);
return user;
}

116
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -26,10 +26,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@RunWith(MockitoJUnitRunner.class)
public class WorkerGroupServiceTest {
@ -51,40 +52,34 @@ public class WorkerGroupServiceTest {
@InjectMocks
private WorkerGroupService workerGroupService;
@Mock
private WorkerGroupMapper workerGroupMapper;
@Mock
private ProcessInstanceMapper processInstanceMapper;
@Mock
private ZookeeperCachedOperator zookeeperCachedOperator;
private String groupName="groupName000001";
/**
* create or update a worker group
*/
@Test
public void testSaveWorkerGroup(){
@Before
public void init(){
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
zookeeperConfig.setDsRoot("/dolphinscheduler_qzw");
Mockito.when(zookeeperCachedOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
User user = new User();
// general user add
user.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1");
logger.info(result.toString());
Assert.assertEquals( Status.USER_NO_OPERATION_PERM.getMsg(),(String) result.get(Constants.MSG));
String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
//success
user.setUserType(UserType.ADMIN_USER);
result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),(String)result.get(Constants.MSG));
// group name exist
Mockito.when(workerGroupMapper.selectById(2)).thenReturn(getWorkerGroup(2));
Mockito.when(workerGroupMapper.queryWorkerGroupByName(groupName)).thenReturn(getList());
result = workerGroupService.saveWorkerGroup(user, 2, groupName, "127.0.0.1");
logger.info(result.toString());
Assert.assertEquals(Status.NAME_EXIST,result.get(Constants.STATUS));
List<String> workerGroupStrList = new ArrayList<>();
workerGroupStrList.add("default");
workerGroupStrList.add("test");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList);
List<String> defaultIpList = new ArrayList<>();
defaultIpList.add("192.168.220.188:1234");
defaultIpList.add("192.168.220.189:1234");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultIpList);
Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultIpList.get(0))).thenReturn("0.02,0.23,0.03,2020-05-08 11:24:14,2020-05-08 14:22:24");
}
/**
@ -92,59 +87,20 @@ public class WorkerGroupServiceTest {
*/
@Test
public void testQueryAllGroupPaging(){
User user = new User();
// general user add
user.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = workerGroupService.queryAllGroupPaging(user, 1, 10, groupName);
logger.info(result.toString());
Assert.assertEquals((String) result.get(Constants.MSG), Status.USER_NO_OPERATION_PERM.getMsg());
//success
user.setUserType(UserType.ADMIN_USER);
Page<WorkerGroup> page = new Page<>(1,10);
page.setRecords(getList());
page.setSize(1L);
Mockito.when(workerGroupMapper.queryListPaging(Mockito.any(Page.class), Mockito.eq(groupName))).thenReturn(page);
result = workerGroupService.queryAllGroupPaging(user, 1, 10, groupName);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),(String)result.get(Constants.MSG));
PageInfo<WorkerGroup> pageInfo = (PageInfo<WorkerGroup>) result.get(Constants.DATA_LIST);
Assert.assertTrue(CollectionUtils.isNotEmpty(pageInfo.getLists()));
Map<String, Object> result = workerGroupService.queryAllGroupPaging(user, 1, 10, null);
PageInfo<WorkerGroup> pageInfo = (PageInfo) result.get(Constants.DATA_LIST);
Assert.assertEquals(pageInfo.getLists().size(),1);
}
/**
* delete group by id
*/
@Test
public void testDeleteWorkerGroupById(){
//DELETE_WORKER_GROUP_BY_ID_FAIL
Mockito.when(processInstanceMapper.queryByWorkerGroupIdAndStatus(1, Constants.NOT_TERMINATED_STATES)).thenReturn(getProcessInstanceList());
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(1);
logger.info(result.toString());
Assert.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(),((Status) result.get(Constants.STATUS)).getCode());
//correct
result = workerGroupService.deleteWorkerGroupById(2);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),(String)result.get(Constants.MSG));
}
@Test
public void testQueryAllGroup() throws Exception {
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
zookeeperConfig.setDsRoot("/ds");
Mockito.when(zookeeperCachedOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
List<String> workerGroupStrList = new ArrayList<>();
workerGroupStrList.add("workerGroup1");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(Mockito.anyString())).thenReturn(workerGroupStrList);
Map<String, Object> result = workerGroupService.queryAllGroup();
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),(String)result.get(Constants.MSG));
List<WorkerGroup> workerGroupList = (List<WorkerGroup>) result.get(Constants.DATA_LIST);
Assert.assertTrue(workerGroupList.size()>0);
Set<String> workerGroups = (Set<String>) result.get(Constants.DATA_LIST);
Assert.assertEquals(workerGroups.size(), 1);
}
@ -158,25 +114,5 @@ public class WorkerGroupServiceTest {
processInstances.add(new ProcessInstance());
return processInstances;
}
/**
* get Group
* @return
*/
private WorkerGroup getWorkerGroup(int id){
WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setName(groupName);
workerGroup.setId(id);
return workerGroup;
}
private WorkerGroup getWorkerGroup(){
return getWorkerGroup(1);
}
private List<WorkerGroup> getList(){
List<WorkerGroup> list = new ArrayList<>();
list.add(getWorkerGroup());
return list;
}
}

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -76,7 +76,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsFailure(){
return this == FAILURE || this == NEED_FAULT_TOLERANCE;
return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL;
}
/**

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java

@ -187,7 +187,9 @@ public class DataxParameters extends AbstractParameters {
@Override
public boolean checkParameters() {
if (customConfig == null) return false;
if (customConfig == null) {
return false;
}
if (customConfig == 0) {
return dataSource != 0
&& dataTarget != 0

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -57,6 +57,12 @@ public class OSUtils {
private OSUtils() {}
/**
* Initialization regularization, solve the problem of pre-compilation performance,
* avoid the thread safety problem of multi-thread operation
*/
private static final Pattern PATTERN = Pattern.compile("\\s+");
/**
* get memory usage
@ -219,8 +225,7 @@ public class OSUtils {
List<String> users = new ArrayList<>();
while (startPos <= endPos) {
Pattern pattern = Pattern.compile("\\s+");
users.addAll(Arrays.asList(pattern.split(lines[startPos])));
users.addAll(Arrays.asList(PATTERN.split(lines[startPos])));
startPos++;
}
@ -313,7 +318,7 @@ public class OSUtils {
String currentProcUserName = System.getProperty("user.name");
String result = exeCmd(String.format("net user \"%s\"", currentProcUserName));
String line = result.split("\n")[22];
String group = Pattern.compile("\\s+").split(line)[1];
String group = PATTERN.split(line)[1];
if (group.charAt(0) == '*') {
return group.substring(1);
} else {

86
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java

@ -189,8 +189,9 @@ public class ProcessBuilderForWin32 {
* @throws NullPointerException if the argument is null
*/
public ProcessBuilderForWin32(List<String> command) {
if (command == null)
if (command == null) {
throw new NullPointerException();
}
this.command = command;
}
@ -207,9 +208,10 @@ public class ProcessBuilderForWin32 {
*/
public ProcessBuilderForWin32(String... command) {
this.command = new ArrayList<>(command.length);
for (String arg : command)
for (String arg : command) {
this.command.add(arg);
}
}
/**
* set username and password for process
@ -238,8 +240,9 @@ public class ProcessBuilderForWin32 {
* @throws NullPointerException if the argument is null
*/
public ProcessBuilderForWin32 command(List<String> command) {
if (command == null)
if (command == null) {
throw new NullPointerException();
}
this.command = command;
return this;
}
@ -257,8 +260,9 @@ public class ProcessBuilderForWin32 {
*/
public ProcessBuilderForWin32 command(String... command) {
this.command = new ArrayList<>(command.length);
for (String arg : command)
for (String arg : command) {
this.command.add(arg);
}
return this;
}
@ -344,11 +348,13 @@ public class ProcessBuilderForWin32 {
*/
public Map<String,String> environment() {
SecurityManager security = System.getSecurityManager();
if (security != null)
if (security != null) {
security.checkPermission(new RuntimePermission("getenv.*"));
}
if (environment == null)
if (environment == null) {
environment = ProcessEnvironmentForWin32.environment();
}
assert environment != null;
@ -369,17 +375,19 @@ public class ProcessBuilderForWin32 {
// for compatibility with old broken code.
// Silently discard any trailing junk.
if (envstring.indexOf((int) '\u0000') != -1)
if (envstring.indexOf((int) '\u0000') != -1) {
envstring = envstring.replaceFirst("\u0000.*", "");
}
int eqlsign =
envstring.indexOf('=', ProcessEnvironmentForWin32.MIN_NAME_LENGTH);
// Silently ignore envstrings lacking the required `='.
if (eqlsign != -1)
if (eqlsign != -1) {
environment.put(envstring.substring(0,eqlsign),
envstring.substring(eqlsign+1));
}
}
}
return this;
}
@ -425,6 +433,7 @@ public class ProcessBuilderForWin32 {
static class NullInputStream extends InputStream {
static final ProcessBuilderForWin32.NullInputStream INSTANCE = new ProcessBuilderForWin32.NullInputStream();
private NullInputStream() {}
@Override
public int read() { return -1; }
@Override
public int available() { return 0; }
@ -436,6 +445,7 @@ public class ProcessBuilderForWin32 {
static class NullOutputStream extends OutputStream {
static final ProcessBuilderForWin32.NullOutputStream INSTANCE = new ProcessBuilderForWin32.NullOutputStream();
private NullOutputStream() {}
@Override
public void write(int b) throws IOException {
throw new IOException("Stream closed");
}
@ -516,7 +526,9 @@ public class ProcessBuilderForWin32 {
* }</pre>
*/
public static final ProcessBuilderForWin32.Redirect PIPE = new ProcessBuilderForWin32.Redirect() {
@Override
public Type type() { return Type.PIPE; }
@Override
public String toString() { return type().toString(); }};
/**
@ -531,7 +543,9 @@ public class ProcessBuilderForWin32 {
* }</pre>
*/
public static final ProcessBuilderForWin32.Redirect INHERIT = new ProcessBuilderForWin32.Redirect() {
@Override
public Type type() { return Type.INHERIT; }
@Override
public String toString() { return type().toString(); }};
/**
@ -565,12 +579,15 @@ public class ProcessBuilderForWin32 {
* @return a redirect to read from the specified file
*/
public static ProcessBuilderForWin32.Redirect from(final File file) {
if (file == null)
if (file == null) {
throw new NullPointerException();
}
return new ProcessBuilderForWin32.Redirect() {
@Override
public Type type() { return Type.READ; }
@Override
public File file() { return file; }
@Override
public String toString() {
return "redirect to read from file \"" + file + "\"";
}
@ -593,12 +610,15 @@ public class ProcessBuilderForWin32 {
* @return a redirect to write to the specified file
*/
public static ProcessBuilderForWin32.Redirect to(final File file) {
if (file == null)
if (file == null) {
throw new NullPointerException();
}
return new ProcessBuilderForWin32.Redirect() {
@Override
public Type type() { return Type.WRITE; }
@Override
public File file() { return file; }
@Override
public String toString() {
return "redirect to write to file \"" + file + "\"";
}
@ -626,12 +646,15 @@ public class ProcessBuilderForWin32 {
* @return a redirect to append to the specified file
*/
public static ProcessBuilderForWin32.Redirect appendTo(final File file) {
if (file == null)
if (file == null) {
throw new NullPointerException();
}
return new ProcessBuilderForWin32.Redirect() {
@Override
public Type type() { return Type.APPEND; }
@Override
public File file() { return file; }
@Override
public String toString() {
return "redirect to append to file \"" + file + "\"";
}
@ -647,14 +670,18 @@ public class ProcessBuilderForWin32 {
* instances of the same type associated with non-null equal
* {@code File} instances.
*/
@Override
public boolean equals(Object obj) {
if (obj == this)
if (obj == this) {
return true;
if (! (obj instanceof ProcessBuilderForWin32.Redirect))
}
if (! (obj instanceof ProcessBuilderForWin32.Redirect)) {
return false;
}
ProcessBuilderForWin32.Redirect r = (ProcessBuilderForWin32.Redirect) obj;
if (r.type() != this.type())
if (r.type() != this.type()) {
return false;
}
assert this.file() != null;
return this.file().equals(r.file());
}
@ -663,13 +690,15 @@ public class ProcessBuilderForWin32 {
* Returns a hash code value for this {@code Redirect}.
* @return a hash code value for this {@code Redirect}
*/
@Override
public int hashCode() {
File file = file();
if (file == null)
if (file == null) {
return super.hashCode();
else
} else {
return file.hashCode();
}
}
/**
* No public constructors. Clients must use predefined
@ -679,10 +708,11 @@ public class ProcessBuilderForWin32 {
}
private ProcessBuilderForWin32.Redirect[] redirects() {
if (redirects == null)
redirects = new ProcessBuilderForWin32.Redirect[] {
ProcessBuilderForWin32.Redirect.PIPE, ProcessBuilderForWin32.Redirect.PIPE, ProcessBuilderForWin32.Redirect.PIPE
if (redirects == null) {
redirects = new Redirect[] {
Redirect.PIPE, Redirect.PIPE, Redirect.PIPE
};
}
return redirects;
}
@ -711,9 +741,10 @@ public class ProcessBuilderForWin32 {
*/
public ProcessBuilderForWin32 redirectInput(ProcessBuilderForWin32.Redirect source) {
if (source.type() == ProcessBuilderForWin32.Redirect.Type.WRITE ||
source.type() == ProcessBuilderForWin32.Redirect.Type.APPEND)
source.type() == ProcessBuilderForWin32.Redirect.Type.APPEND) {
throw new IllegalArgumentException(
"Redirect invalid for reading: " + source);
}
redirects()[0] = source;
return this;
}
@ -741,9 +772,10 @@ public class ProcessBuilderForWin32 {
* @since 1.7
*/
public ProcessBuilderForWin32 redirectOutput(ProcessBuilderForWin32.Redirect destination) {
if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ)
if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ) {
throw new IllegalArgumentException(
"Redirect invalid for writing: " + destination);
}
redirects()[1] = destination;
return this;
}
@ -775,9 +807,10 @@ public class ProcessBuilderForWin32 {
* @since 1.7
*/
public ProcessBuilderForWin32 redirectError(ProcessBuilderForWin32.Redirect destination) {
if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ)
if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ) {
throw new IllegalArgumentException(
"Redirect invalid for writing: " + destination);
}
redirects()[2] = destination;
return this;
}
@ -1019,15 +1052,18 @@ public class ProcessBuilderForWin32 {
String[] cmdarray = command.toArray(new String[command.size()]);
cmdarray = cmdarray.clone();
for (String arg : cmdarray)
if (arg == null)
for (String arg : cmdarray) {
if (arg == null) {
throw new NullPointerException();
}
}
// Throws IndexOutOfBoundsException if command is empty
String prog = cmdarray[0];
SecurityManager security = System.getSecurityManager();
if (security != null)
if (security != null) {
security.checkExec(prog);
}
String dir = directory == null ? null : directory.toString();

38
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java

@ -27,22 +27,25 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
private static String validateName(String name) {
// An initial `=' indicates a magic Windows variable name -- OK
if (name.indexOf('=', 1) != -1 ||
name.indexOf('\u0000') != -1)
name.indexOf('\u0000') != -1) {
throw new IllegalArgumentException
("Invalid environment variable name: \"" + name + "\"");
}
return name;
}
private static String validateValue(String value) {
if (value.indexOf('\u0000') != -1)
if (value.indexOf('\u0000') != -1) {
throw new IllegalArgumentException
("Invalid environment variable value: \"" + value + "\"");
}
return value;
}
private static String nonNullString(Object o) {
if (o == null)
if (o == null) {
throw new NullPointerException();
}
return (String) o;
}
@ -70,26 +73,38 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
private static class CheckedEntry implements Entry<String,String> {
private final Entry<String,String> e;
public CheckedEntry(Entry<String,String> e) {this.e = e;}
@Override
public String getKey() { return e.getKey();}
@Override
public String getValue() { return e.getValue();}
@Override
public String setValue(String value) {
return e.setValue(validateValue(value));
}
@Override
public String toString() { return getKey() + "=" + getValue();}
@Override
public boolean equals(Object o) {return e.equals(o);}
@Override
public int hashCode() {return e.hashCode();}
}
private static class CheckedEntrySet extends AbstractSet<Entry<String,String>> {
private final Set<Entry<String,String>> s;
public CheckedEntrySet(Set<Entry<String,String>> s) {this.s = s;}
@Override
public int size() {return s.size();}
@Override
public boolean isEmpty() {return s.isEmpty();}
@Override
public void clear() { s.clear();}
@Override
public Iterator<Entry<String,String>> iterator() {
return new Iterator<Entry<String,String>>() {
Iterator<Entry<String,String>> i = s.iterator();
@Override
public boolean hasNext() { return i.hasNext();}
@Override
public Entry<String,String> next() {
return new CheckedEntry(i.next());
}
@ -104,18 +119,22 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
nonNullString(e.getValue());
return e;
}
@Override
public boolean contains(Object o) {return s.contains(checkedEntry(o));}
@Override
public boolean remove(Object o) {return s.remove(checkedEntry(o));}
}
private static class CheckedValues extends AbstractCollection<String> {
private final Collection<String> c;
public CheckedValues(Collection<String> c) {this.c = c;}
@Override
public int size() {return c.size();}
@Override
public boolean isEmpty() {return c.isEmpty();}
@Override
public void clear() { c.clear();}
@Override
public Iterator<String> iterator() {return c.iterator();}
@Override
public boolean contains(Object o) {return c.contains(nonNullString(o));}
@ -126,11 +145,17 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
private static class CheckedKeySet extends AbstractSet<String> {
private final Set<String> s;
public CheckedKeySet(Set<String> s) {this.s = s;}
@Override
public int size() {return s.size();}
@Override
public boolean isEmpty() {return s.isEmpty();}
@Override
public void clear() { s.clear();}
@Override
public Iterator<String> iterator() {return s.iterator();}
@Override
public boolean contains(Object o) {return s.contains(nonNullString(o));}
@Override
public boolean remove(Object o) {return s.remove(nonNullString(o));}
}
@Override
@ -147,6 +172,7 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
}
private static final class NameComparator implements Comparator<String> {
@Override
public int compare(String s1, String s2) {
// We can't use String.compareToIgnoreCase since it
// canonicalizes to lower case, while Windows
@ -163,14 +189,17 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
c2 = Character.toUpperCase(c2);
if (c1 != c2)
// No overflow because of numeric promotion
{
return c1 - c2;
}
}
}
return n1 - n2;
}
}
private static final class EntryComparator implements Comparator<Entry<String,String>> {
@Override
public int compare(Entry<String,String> e1,
Entry<String,String> e2) {
return nameComparator.compare(e1.getKey(), e2.getKey());
@ -278,9 +307,10 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
// add the environment variable to the child, if it exists in parent
private static void addToEnvIfSet(StringBuilder sb, String name) {
String s = getenv(name);
if (s != null)
if (s != null) {
addToEnv(sb, name, s);
}
}
private static void addToEnv(StringBuilder sb, String name, String val) {
sb.append(name).append('=').append(val).append('\u0000');

89
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java

@ -93,13 +93,15 @@ public class ProcessImplForWin32 extends Process {
if (append) {
String path = f.getPath();
SecurityManager sm = System.getSecurityManager();
if (sm != null)
if (sm != null) {
sm.checkWrite(path);
}
long handle = openForAtomicAppend(path);
final FileDescriptor fd = new FileDescriptor();
setHandle(fd, handle);
return AccessController.doPrivileged(
new PrivilegedAction<FileOutputStream>() {
@Override
public FileOutputStream run() {
return new FileOutputStream(fd);
}
@ -133,30 +135,30 @@ public class ProcessImplForWin32 extends Process {
} else {
stdHandles = new long[3];
if (redirects[0] == ProcessBuilderForWin32.Redirect.PIPE)
if (redirects[0] == ProcessBuilderForWin32.Redirect.PIPE) {
stdHandles[0] = -1L;
else if (redirects[0] == ProcessBuilderForWin32.Redirect.INHERIT)
} else if (redirects[0] == ProcessBuilderForWin32.Redirect.INHERIT) {
stdHandles[0] = getHandle(FileDescriptor.in);
else {
} else {
f0 = new FileInputStream(redirects[0].file());
stdHandles[0] = getHandle(f0.getFD());
}
if (redirects[1] == ProcessBuilderForWin32.Redirect.PIPE)
if (redirects[1] == ProcessBuilderForWin32.Redirect.PIPE) {
stdHandles[1] = -1L;
else if (redirects[1] == ProcessBuilderForWin32.Redirect.INHERIT)
} else if (redirects[1] == ProcessBuilderForWin32.Redirect.INHERIT) {
stdHandles[1] = getHandle(FileDescriptor.out);
else {
} else {
f1 = newFileOutputStream(redirects[1].file(),
redirects[1].append());
stdHandles[1] = getHandle(f1.getFD());
}
if (redirects[2] == ProcessBuilderForWin32.Redirect.PIPE)
if (redirects[2] == ProcessBuilderForWin32.Redirect.PIPE) {
stdHandles[2] = -1L;
else if (redirects[2] == ProcessBuilderForWin32.Redirect.INHERIT)
} else if (redirects[2] == ProcessBuilderForWin32.Redirect.INHERIT) {
stdHandles[2] = getHandle(FileDescriptor.err);
else {
} else {
f2 = newFileOutputStream(redirects[2].file(),
redirects[2].append());
stdHandles[2] = getHandle(f2.getFD());
@ -167,10 +169,19 @@ public class ProcessImplForWin32 extends Process {
} finally {
// In theory, close() can throw IOException
// (although it is rather unlikely to happen here)
try { if (f0 != null) f0.close(); }
try { if (f0 != null) {
f0.close();
}
}
finally {
try { if (f1 != null) f1.close(); }
finally { if (f2 != null) f2.close(); }
try { if (f1 != null) {
f1.close();
}
}
finally { if (f2 != null) {
f2.close();
}
}
}
}
@ -193,8 +204,9 @@ public class ProcessImplForWin32 extends Process {
private static String[] getTokensFromCommand(String command) {
ArrayList<String> matchList = new ArrayList<>(8);
Matcher regexMatcher = ProcessImplForWin32.LazyPattern.PATTERN.matcher(command);
while (regexMatcher.find())
while (regexMatcher.find()) {
matchList.add(regexMatcher.group());
}
return matchList.toArray(new String[matchList.size()]);
}
@ -378,8 +390,9 @@ public class ProcessImplForWin32 extends Process {
// .bat files don't include backslashes as part of the quote
private static int countLeadingBackslash(int verificationType,
CharSequence input, int start) {
if (verificationType == VERIFICATION_CMD_BAT)
if (verificationType == VERIFICATION_CMD_BAT) {
return 0;
}
int j;
for (j = start - 1; j >= 0 && input.charAt(j) == BACKSLASH; j--) {
// just scanning backwards
@ -417,8 +430,9 @@ public class ProcessImplForWin32 extends Process {
String executablePath = new File(cmd[0]).getPath();
// No worry about internal, unpaired ["], and redirection/piping.
if (needsEscaping(VERIFICATION_LEGACY, executablePath) )
if (needsEscaping(VERIFICATION_LEGACY, executablePath) ) {
executablePath = quoteString(executablePath);
}
cmdstr = createCommandLine(
//legacy mode doesn't worry about extended verification
@ -442,17 +456,19 @@ public class ProcessImplForWin32 extends Process {
// Restore original command line.
StringBuilder join = new StringBuilder();
// terminal space in command line is ok
for (String s : cmd)
for (String s : cmd) {
join.append(s).append(' ');
}
// Parse the command line again.
cmd = getTokensFromCommand(join.toString());
executablePath = getExecutablePath(cmd[0]);
// Check new executable name once more
if (security != null)
if (security != null) {
security.checkExec(executablePath);
}
}
// Quotation protects from interpretation of the [path] argument as
// start of longer path with spaces. Quotation has no influence to
@ -471,28 +487,29 @@ public class ProcessImplForWin32 extends Process {
AccessController.doPrivileged(
new PrivilegedAction<Void>() {
@Override
public Void run() {
if (stdHandles[0] == -1L)
if (stdHandles[0] == -1L) {
stdinStream = ProcessBuilderForWin32.NullOutputStream.INSTANCE;
else {
} else {
FileDescriptor stdinFd = new FileDescriptor();
setHandle(stdinFd, stdHandles[0]);
stdinStream = new BufferedOutputStream(
new FileOutputStream(stdinFd));
}
if (stdHandles[1] == -1L)
if (stdHandles[1] == -1L) {
stdoutStream = ProcessBuilderForWin32.NullInputStream.INSTANCE;
else {
} else {
FileDescriptor stdoutFd = new FileDescriptor();
setHandle(stdoutFd, stdHandles[1]);
stdoutStream = new BufferedInputStream(
new FileInputStream(stdoutFd));
}
if (stdHandles[2] == -1L)
if (stdHandles[2] == -1L) {
stderrStream = ProcessBuilderForWin32.NullInputStream.INSTANCE;
else {
} else {
FileDescriptor stderrFd = new FileDescriptor();
setHandle(stderrFd, stdHandles[2]);
stderrStream = new FileInputStream(stderrFd);
@ -501,33 +518,41 @@ public class ProcessImplForWin32 extends Process {
return null; }});
}
@Override
public OutputStream getOutputStream() {
return stdinStream;
}
@Override
public InputStream getInputStream() {
return stdoutStream;
}
@Override
public InputStream getErrorStream() {
return stderrStream;
}
@Override
protected void finalize() {
closeHandle(handle);
}
@Override
public int exitValue() {
int exitCode = getExitCodeProcess(handle);
if (exitCode == STILL_ACTIVE)
if (exitCode == STILL_ACTIVE) {
throw new IllegalThreadStateException("process has not exited");
}
return exitCode;
}
@Override
public int waitFor() throws InterruptedException {
waitForInterruptibly(handle);
if (Thread.interrupted())
if (Thread.interrupted()) {
throw new InterruptedException();
}
return exitValue();
}
@ -535,8 +560,12 @@ public class ProcessImplForWin32 extends Process {
public boolean waitFor(long timeout, TimeUnit unit)
throws InterruptedException
{
if (getExitCodeProcess(handle) != STILL_ACTIVE) return true;
if (timeout <= 0) return false;
if (getExitCodeProcess(handle) != STILL_ACTIVE) {
return true;
}
if (timeout <= 0) {
return false;
}
long remainingNanos = unit.toNanos(timeout);
long deadline = System.nanoTime() + remainingNanos ;
@ -545,8 +574,9 @@ public class ProcessImplForWin32 extends Process {
// Round up to next millisecond
long msTimeout = TimeUnit.NANOSECONDS.toMillis(remainingNanos + 999_999L);
waitForTimeoutInterruptibly(handle, msTimeout);
if (Thread.interrupted())
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (getExitCodeProcess(handle) != STILL_ACTIVE) {
return true;
}
@ -556,6 +586,7 @@ public class ProcessImplForWin32 extends Process {
return (getExitCodeProcess(handle) != STILL_ACTIVE);
}
@Override
public void destroy() { terminateProcess(handle); }
@Override

18
dolphinscheduler-common/src/main/resources/common.properties

@ -19,22 +19,22 @@
resource.storage.type=NONE
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
resource.upload.path=/dolphinscheduler
#resource.upload.path=/dolphinscheduler
# user data local directory path, please make sure the directory exists and have read write permissions
#data.basedir.path=/tmp/dolphinscheduler
# whether kerberos starts
hadoop.security.authentication.startup.state=false
#hadoop.security.authentication.startup.state=false
# java.security.krb5.conf path
java.security.krb5.conf.path=/opt/krb5.conf
#java.security.krb5.conf.path=/opt/krb5.conf
# login user from keytab username
login.user.keytab.username=hdfs-mycluster@ESZ.COM
#login.user.keytab.username=hdfs-mycluster@ESZ.COM
# loginUserFromKeytab path
login.user.keytab.path=/opt/hdfs.headless.keytab
#login.user.keytab.path=/opt/hdfs.headless.keytab
#resource.view.suffixs
#resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties
@ -46,18 +46,18 @@ hdfs.root.user=hdfs
fs.defaultFS=hdfs://mycluster:8020
# if resource.storage.type=S3,s3 endpoint
fs.s3a.endpoint=http://192.168.199.91:9010
#fs.s3a.endpoint=http://192.168.199.91:9010
# if resource.storage.type=S3,s3 access key
fs.s3a.access.key=A3DXS30FO22544RE
#fs.s3a.access.key=A3DXS30FO22544RE
# if resource.storage.type=S3,s3 secret key
fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
#fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
# if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# if resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname.
# If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname.
yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
# system env path

20
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstanceMap.java

@ -91,14 +91,24 @@ public class ProcessInstanceMap {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProcessInstanceMap that = (ProcessInstanceMap) o;
if (id != that.id) return false;
if (parentProcessInstanceId != that.parentProcessInstanceId) return false;
if (parentTaskInstanceId != that.parentTaskInstanceId) return false;
if (id != that.id) {
return false;
}
if (parentProcessInstanceId != that.parentProcessInstanceId) {
return false;
}
if (parentTaskInstanceId != that.parentTaskInstanceId) {
return false;
}
return processInstanceId == that.processInstanceId;
}

16
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Queue.java

@ -104,13 +104,21 @@ public class Queue {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Queue queue1 = (Queue) o;
if (id != queue1.id) return false;
if (!queueName.equals(queue1.queueName)) return false;
if (id != queue1.id) {
return false;
}
if (!queueName.equals(queue1.queueName)) {
return false;
}
return queue.equals(queue1.queue);
}

20
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Session.java

@ -93,14 +93,24 @@ public class Session {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Session session = (Session) o;
if (userId != session.userId) return false;
if (!id.equals(session.id)) return false;
if (!lastLoginTime.equals(session.lastLoginTime)) return false;
if (userId != session.userId) {
return false;
}
if (!id.equals(session.id)) {
return false;
}
if (!lastLoginTime.equals(session.lastLoginTime)) {
return false;
}
return ip.equals(session.ip);
}

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Tenant.java

@ -166,8 +166,12 @@ public class Tenant {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Tenant tenant = (Tenant) o;

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/User.java

@ -67,6 +67,11 @@ public class User {
*/
private int tenantId;
/**
* user state
*/
private int state;
/**
* tenant code
*/
@ -219,6 +224,14 @@ public class User {
this.queue = queue;
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -254,6 +267,7 @@ public class User {
", phone='" + phone + '\'' +
", userType=" + userType +
", tenantId=" + tenantId +
", state=" + state +
", tenantCode='" + tenantCode + '\'' +
", tenantName='" + tenantName + '\'' +
", queueName='" + queueName + '\'' +

47
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java

@ -21,41 +21,22 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.util.Date;
import java.util.List;
/**
* worker group for task running
* worker group
*/
@TableName("t_ds_worker_group")
public class WorkerGroup {
@TableId(value="id", type=IdType.AUTO)
private int id;
private String name;
private String ipList;
private List<String> ipList;
private Date createTime;
private Date updateTime;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getIpList() {
return ipList;
}
public void setIpList(String ipList) {
this.ipList = ipList;
}
public Date getCreateTime() {
return createTime;
}
@ -72,18 +53,6 @@ public class WorkerGroup {
this.updateTime = updateTime;
}
@Override
public String toString() {
return "Worker group model{" +
"id= " + id +
",name= " + name +
",ipList= " + ipList +
",createTime= " + createTime +
",updateTime= " + updateTime +
"}";
}
public String getName() {
return name;
}
@ -91,4 +60,14 @@ public class WorkerGroup {
public void setName(String name) {
this.name = name;
}
public List<String> getIpList() {
return ipList;
}
public void setIpList(List<String> ipList) {
this.ipList = ipList;
}
}

54
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java

@ -1,54 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* worker group mapper interface
*/
public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
/**
* query all worker group
* @return worker group list
*/
List<WorkerGroup> queryAllWorkerGroup();
/**
* query worer grouop by name
* @param name name
* @return worker group list
*/
List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
/**
* worker group page
* @param page page
* @param searchVal searchVal
* @return worker group IPage
*/
IPage<WorkerGroup> queryListPaging(IPage<WorkerGroup> page,
@Param("searchVal") String searchVal);
}

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/MysqlPerformance.java

@ -56,7 +56,7 @@ public class MysqlPerformance extends BaseDBPerformance{
try (ResultSet rs1 = pstmt.executeQuery("show global variables")) {
while(rs1.next()){
if(rs1.getString(VARIABLE_NAME).equalsIgnoreCase("MAX_CONNECTIONS")){
if("MAX_CONNECTIONS".equalsIgnoreCase(rs1.getString(VARIABLE_NAME))){
monitorRecord.setMaxConnections( Long.parseLong(rs1.getString("value")));
}
}
@ -64,11 +64,11 @@ public class MysqlPerformance extends BaseDBPerformance{
try (ResultSet rs2 = pstmt.executeQuery("show global status")) {
while(rs2.next()){
if(rs2.getString(VARIABLE_NAME).equalsIgnoreCase("MAX_USED_CONNECTIONS")){
if("MAX_USED_CONNECTIONS".equalsIgnoreCase(rs2.getString(VARIABLE_NAME))){
monitorRecord.setMaxUsedConnections(Long.parseLong(rs2.getString("value")));
}else if(rs2.getString(VARIABLE_NAME).equalsIgnoreCase("THREADS_CONNECTED")){
}else if("THREADS_CONNECTED".equalsIgnoreCase(rs2.getString(VARIABLE_NAME))){
monitorRecord.setThreadsConnections(Long.parseLong(rs2.getString("value")));
}else if(rs2.getString(VARIABLE_NAME).equalsIgnoreCase("THREADS_RUNNING")){
}else if("THREADS_RUNNING".equalsIgnoreCase(rs2.getString(VARIABLE_NAME))){
monitorRecord.setThreadsRunningConnections(Long.parseLong(rs2.getString("value")));
}
}

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UserMapper.xml

@ -32,7 +32,7 @@
</select>
<select id="queryUserPaging" resultType="org.apache.dolphinscheduler.dao.entity.User">
select u.id,u.user_name,u.user_password,u.user_type,u.email,u.phone,u.tenant_id,u.create_time,
u.update_time,t.tenant_name,
u.update_time,t.tenant_name,u.state,
case when u.queue <![CDATA[ <> ]]> '' then u.queue else q.queue_name end as queue, q.queue_name
from t_ds_user u
left join t_ds_tenant t on u.tenant_id=t.id

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -46,6 +46,14 @@ public class ProcessUtils {
*/
private final static Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
/**
* Initialization regularization, solve the problem of pre-compilation performance,
* avoid the thread safety problem of multi-thread operation
*/
private static final Pattern MACPATTERN = Pattern.compile("-[+|-]-\\s(\\d+)");
private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)");
/**
* build command line characters
* @param commandList command list
@ -356,10 +364,10 @@ public class ProcessUtils {
// pstree pid get sub pids
if (OSUtils.isMacOS()) {
String pids = OSUtils.exeCmd("pstree -sp " + processId);
mat = Pattern.compile("-[+|-]-\\s(\\d+)").matcher(pids);
mat = MACPATTERN.matcher(pids);
} else {
String pids = OSUtils.exeCmd("pstree -p " + processId);
mat = Pattern.compile("(\\d+)").matcher(pids);
mat = WINDOWSATTERN.matcher(pids);
}
while (mat.find()){

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -71,9 +71,9 @@ public class WorkerServer {
private SpringApplicationContext springApplicationContext;
/**
* master server startup
* worker server startup
*
* master server not use web service
* worker server not use web service
* @param args arguments
*/
public static void main(String[] args) {

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -101,9 +101,15 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
try {
this.doAck(taskExecutionContext);
}catch (Exception e){
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
this.doAck(taskExecutionContext);
}
// submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext,taskCallbackService));
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
}
private void doAck(TaskExecutionContext taskExecutionContext){

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -18,10 +18,12 @@ package org.apache.dolphinscheduler.server.worker.runner;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@ -131,9 +133,14 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
} finally {
try {
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}catch (Exception e){
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}
}
}
/**
* get global paras map

5
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java

@ -113,10 +113,7 @@ public class DependencyConfig {
return Mockito.mock(ResourceMapper.class);
}
@Bean
public WorkerGroupMapper workerGroupMapper(){
return Mockito.mock(WorkerGroupMapper.class);
}
@Bean
public ErrorCommandMapper errorCommandMapper(){

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java

@ -107,10 +107,6 @@ public class TaskCallbackServiceTestConfig {
return Mockito.mock(ResourceMapper.class);
}
@Bean
public WorkerGroupMapper workerGroupMapper(){
return Mockito.mock(WorkerGroupMapper.class);
}
@Bean
public ErrorCommandMapper errorCommandMapper(){

11
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -86,8 +86,7 @@ public class ProcessService {
@Autowired
private ResourceMapper resourceMapper;
@Autowired
private WorkerGroupMapper workerGroupMapper;
@Autowired
private ErrorCommandMapper errorCommandMapper;
@ -1670,15 +1669,7 @@ public class ProcessService {
return queue;
}
/**
* query worker group by id
* @param workerGroupId workerGroupId
* @return WorkerGroup
*/
public WorkerGroup queryWorkerGroupById(int workerGroupId){
return workerGroupMapper.selectById(workerGroupId);
}
/**
* get task worker group

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -272,7 +272,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
if("instance must be started before calling this method".equals(e.getMessage())){
logger.warn("lock release");
}else{
logger.error("lock release failed",e);

10
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/details/index.vue

@ -118,7 +118,7 @@
* up
*/
_onUp: _.debounce(function () {
this.loadingIndex = this.loadingIndex - 2
this.loadingIndex = this.loadingIndex - 3
console.log('_onUp')
this._editorOff()
@ -131,7 +131,7 @@
* down
*/
_onDown: _.debounce(function () {
this.loadingIndex = this.loadingIndex + 2
this.loadingIndex = this.loadingIndex + 3
console.log('_onDown')
this._editorOff()
@ -166,11 +166,11 @@
// down
if ((scrollTop + h) > totalHeight) {
if (this.isData) {
this._onDown()
// this._onDown()
}
}
// up
if (scrollTop < 2) {
if (scrollTop < 3) {
if (this.loadingIndex > 0) {
this._onUp()
}
@ -210,7 +210,7 @@
return {
id: this.$route.params.id,
skipLineNum: parseInt(`${this.loadingIndex ? this.loadingIndex + '000' : 0}`),
limit: parseInt(`${this.loadingIndex ? this.loadingIndex + 2 : 2}000`)
limit: parseInt(`${this.loadingIndex ? this.loadingIndex + 3 : 3}000`)
}
}
},

29
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue

@ -46,7 +46,7 @@
<m-list-box-f v-if="isADMIN">
<template slot="name"><strong>*</strong>{{$t('Tenant')}}</template>
<template slot="content">
<x-select v-model="tenantId">
<x-select v-model="tenantId" style="width: 100%;">
<x-option
v-for="city in tenantList"
:key="city.id"
@ -59,8 +59,8 @@
<m-list-box-f v-if="isADMIN">
<template slot="name">{{$t('Queue')}}</template>
<template slot="content">
<x-select v-model="queueName">
<x-input slot="trigger" slot-scope="{ selectedModel }" readonly :placeholder="$t('Please select a queue')" :value="selectedModel ? selectedModel.label : ''" style="width: 200px;" @on-click-icon.stop="queueName = ''">
<x-select v-model="queueName" style="width: 100%;">
<x-input slot="trigger" slot-scope="{ selectedModel }" readonly :placeholder="$t('Please select a queue')" :value="selectedModel ? selectedModel.label : ''" @on-click-icon.stop="queueName = ''">
<em slot="suffix" class="ans-icon-fail-solid" style="font-size: 15px;cursor: pointer;" v-show="queueName ==''"></em>
<em slot="suffix" class="ans-icon-arrow-down" style="font-size: 12px;" v-show="queueName!=''"></em>
</x-input>
@ -93,6 +93,15 @@
</x-input>
</template>
</m-list-box-f>
<m-list-box-f>
<template slot="name">{{$t('State')}}</template>
<template slot="content">
<x-radio-group v-model="userState" >
<x-radio :label="'1'">{{$t('Enable')}}</x-radio>
<x-radio :label="'0'">{{$t('Disable')}}</x-radio>
</x-radio-group>
</template>
</m-list-box-f>
</div>
</template>
</m-popup>
@ -118,6 +127,7 @@
queueName: '',
email: '',
phone: '',
userState: '1',
tenantList: [],
// Source admin user information
isADMIN: store.state.user.userInfo.userType === 'ADMIN_USER' && router.history.current.name !== 'account'
@ -229,13 +239,20 @@
},
_submit () {
this.$refs['popup'].spinnerLoading = true
let queueCode = '';
//get queue code
if (this.queueName != ''){
queueCode = this.queueList.length > 0 ? _.find(this.queueList, ['id', this.queueName]).code : ''
}
let param = {
userName: this.userName,
userPassword: this.userPassword,
tenantId: this.tenantId,
email: this.email,
queue: this.queueList.length>0? _.find(this.queueList, ['id', this.queueName]).code : '',
phone: this.phone
queue: queueCode,
phone: this.phone,
state: this.userState
}
if (this.item) {
@ -264,6 +281,7 @@
this.userPassword = ''
this.email = this.item.email
this.phone = this.item.phone
this.userState = this.item.state + '' || '1'
this.tenantId = this.item.tenantId
this.$nextTick(() => {
this.queueName = _.find(this.queueList, ['code', this.item.queue]).id||''
@ -276,6 +294,7 @@
this.userPassword = ''
this.email = this.item.email
this.phone = this.item.phone
this.userState = this.state + '' || '1'
this.tenantId = this.item.tenantId
if(this.queueList.length>0) {
this.queueName = _.find(this.queueList, ['code', this.item.queue]).id

8
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue

@ -40,7 +40,9 @@
<th>
<span>{{$t('Phone')}}</span>
</th>
<th id="state">
<span>{{$t('State')}}</span>
</th>
<th>
<span>{{$t('Create Time')}}</span>
</th>
@ -71,6 +73,10 @@
<td>
<span>{{item.phone || '-'}}</span>
</td>
<td>
<span v-if="item.state == 1">{{$t('Enable')}}</span>
<span v-else>{{$t('Disable')}}</span>
</td>
<td>
<span v-if="item.createTime">{{item.createTime | formatDate}}</span>
<span v-else>-</span>

28
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/list.vue

@ -34,9 +34,6 @@
<th>
<span>{{$t('Update Time')}}</span>
</th>
<th width="70">
<span>{{$t('Operation')}}</span>
</th>
</tr>
<tr v-for="(item, $index) in list" :key="$index">
<td>
@ -48,7 +45,7 @@
</span>
</td>
<td>
<span>{{item.ipList}}</span>
<span>{{item.ipList.join(',')}}</span>
</td>
<td>
<span v-if="item.createTime">{{item.createTime | formatDate}}</span>
@ -58,24 +55,6 @@
<span v-if="item.updateTime">{{item.updateTime | formatDate}}</span>
<span v-else>-</span>
</td>
<td>
<x-button type="info" shape="circle" size="xsmall" data-toggle="tooltip" icon="ans-icon-edit" :title="$t('Edit')" @click="_edit(item)">
</x-button>
<x-poptip
:ref="'poptip-delete-' + $index"
placement="bottom-end"
width="90">
<p>{{$t('Delete?')}}</p>
<div style="text-align: right; margin: 0;padding-top: 4px;">
<x-button type="text" size="xsmall" shape="circle" @click="_closeDelete($index)">{{$t('Cancel')}}</x-button>
<x-button type="primary" size="xsmall" shape="circle" @click="_delete(item,$index)">{{$t('Confirm')}}</x-button>
</div>
<template slot="reference">
<x-button type="error" shape="circle" size="xsmall" data-toggle="tooltip" icon="ans-icon-trash" :title="$t('delete')">
</x-button>
</template>
</x-poptip>
</td>
</tr>
</table>
</div>
@ -128,8 +107,7 @@
created () {
this.list = this.workerGroupList
},
mounted () {
},
components: { }
mounted () {},
components: {},
}
</script>

9
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/index.vue

@ -17,11 +17,7 @@
<template>
<m-list-construction :title="$t('Worker group manage')">
<template slot="conditions">
<m-conditions @on-conditions="_onConditions">
<template slot="button-group" v-if="isADMIN">
<x-button type="ghost" size="small" @click="_create('')">{{$t('Create worker group')}}</x-button>
</template>
</m-conditions>
<m-conditions @on-conditions="_onConditions"></m-conditions>
</template>
<template slot="content">
<template v-if="workerGroupList.length || total>0">
@ -141,8 +137,7 @@
this.searchParams.pageNo = _.isEmpty(a.query) ? 1 : a.query.pageNo
}
},
created () {
},
created () {},
mounted () {
this.$modal.destroy()
},

8
dolphinscheduler-ui/src/js/conf/home/router/index.js

@ -374,6 +374,14 @@ const router = new Router({
title: `${i18n.$t('Queue manage')}`
}
},
{
path: '/security/worker-groups',
name: 'worker-groups-manage',
component: resolve => require(['../pages/security/pages/workerGroups/index'], resolve),
meta: {
title: `${i18n.$t('Worker group manage')}`
}
},
{
path: '/security/token',
name: 'token-manage',

9
dolphinscheduler-ui/src/js/module/components/secondaryMenu/_source/menu.js

@ -100,6 +100,15 @@ const menu = {
icon: 'ans-icon-danger-solid',
children: []
},
{
name: `${i18n.$t('Worker group manage')}`,
id: 4,
path: 'worker-groups-manage',
isOpen: true,
disabled: true,
icon: 'ans-icon-diary',
children: []
},
{
name: `${i18n.$t('Queue manage')}`,
id: 3,

6
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -173,7 +173,7 @@ export default {
'Tenant Code': 'Tenant Code',
'Tenant Name': 'Tenant Name',
Queue: 'Queue',
'Please select a queue': 'Please select a queue',
'Please select a queue': 'default is tenant association queue',
'Please enter the tenant code in English': 'Please enter the tenant code in English',
'Please enter tenant code in English': 'Please enter tenant code in English',
'Edit User': 'Edit User',
@ -590,5 +590,7 @@ export default {
'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow',
'Successful branch flow and failed branch flow are required': 'Successful branch flow and failed branch flow are required',
'Unauthorized or deleted resources': 'Unauthorized or deleted resources',
'Please delete all non-existent resources': 'Please delete all non-existent resources'
'Please delete all non-existent resources': 'Please delete all non-existent resources',
'Enable': 'Enable',
'Disable': 'Disable'
}

6
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -493,7 +493,7 @@ export default {
'Statement cannot be empty': '语句不能为空',
'Process Define Count': '工作流定义数',
'Process Instance Running Count': '正在运行的流程数',
'Please select a queue': '请选择队列',
'Please select a queue': '默认为租户关联队列',
'command number of waiting for running': '待执行的命令数',
'failure command number': '执行失败的命令数',
'tasks number of waiting running': '待运行任务数',
@ -590,5 +590,7 @@ export default {
'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点',
'Successful branch flow and failed branch flow are required': '成功分支流转和失败分支流转必填',
'Unauthorized or deleted resources': '未授权或已删除资源',
'Please delete all non-existent resources': '请删除所有未授权或已删除资源'
'Please delete all non-existent resources': '请删除所有未授权或已删除资源',
'Enable': '启用',
'Disable': '停用'
}

2
e2e/src/test/java/org/apache/dolphinscheduler/locator/security/UserManageLocator.java

@ -42,7 +42,7 @@ public class UserManageLocator {
public static final By SUBMIT = By.xpath("//div[3]/button[2]/span");
public static final By DELETE_USER_BUTTON = By.xpath("//span[2]/button/i");
public static final By DELETE_USER_BUTTON = By.xpath("//span[2]/button");
public static final By CONFIRM_DELETE_USER_BUTTON = By.xpath("//div[2]/div/button[2]/span");
}

1
sql/dolphinscheduler_mysql.sql

@ -415,6 +415,7 @@ CREATE TABLE `t_ds_process_definition` (
`modify_by` varchar(255) DEFAULT NULL,
`resource_ids` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `process_definition_unique` (`name`,`project_id`),
KEY `process_definition_index` (`project_id`,`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2
sql/soft_version

@ -1 +1 @@
1.2.2
1.3.0

21
sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql

@ -357,3 +357,24 @@ delimiter ;
CALL dc_dolphin_T_t_ds_error_command_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id;
-- ac_dolphin_T_t_ds_user_A_state
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_user_A_state;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_user_A_state()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_user'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='state')
THEN
ALTER TABLE t_ds_user ADD `state` int(1) DEFAULT 1 COMMENT 'state 0:disable 1:enable';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_user_A_state;
DROP PROCEDURE ac_dolphin_T_t_ds_user_A_state;

20
sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

@ -364,3 +364,23 @@ select dc_dolphin_T_t_ds_error_command_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id();
-- ac_dolphin_T_t_ds_user_A_state
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_user_A_state();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_user_A_state() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_user'
AND COLUMN_NAME ='state')
THEN
ALTER TABLE t_ds_user ADD COLUMN state int DEFAULT 1;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_user_A_state();
DROP FUNCTION ac_dolphin_T_t_ds_user_A_state();
Loading…
Cancel
Save