From 2888907376344f0760904ad2be2d0074ee77b599 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Wed, 17 Apr 2019 13:55:25 +0800 Subject: [PATCH] user-specified queue function development --- .../api/controller/UsersController.java | 14 +++--- .../escheduler/api/service/UsersService.java | 13 ++++- escheduler-dao/readme.txt | 1 + .../java/cn/escheduler/dao/ProcessDao.java | 20 ++++++-- .../cn/escheduler/dao/mapper/UserMapper.java | 9 ++++ .../dao/mapper/UserMapperProvider.java | 18 +++++++ .../java/cn/escheduler/dao/model/User.java | 49 ++++++++++++------- .../escheduler/dao/mapper/UserMapperTest.java | 6 +++ .../worker/runner/TaskScheduleThread.java | 11 ++++- 9 files changed, 110 insertions(+), 31 deletions(-) create mode 100644 escheduler-dao/readme.txt diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java index d217314625..d72e192f51 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java @@ -64,13 +64,14 @@ public class UsersController extends BaseController{ @RequestParam(value = "userName") String userName, @RequestParam(value = "userPassword") String userPassword, @RequestParam(value = "tenantId") int tenantId, + @RequestParam(value = "queue") String queue, @RequestParam(value = "email") String email, @RequestParam(value = "phone", required = false) String phone) { - logger.info("login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, proxyUsers: {}", - loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone); + logger.info("login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}", + loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone,queue); try { - Map result = usersService.createUser(loginUser, userName, userPassword, email, tenantId, phone); + Map result = usersService.createUser(loginUser, userName, userPassword,email,tenantId, phone,queue); return returnDataList(result); }catch (Exception e){ logger.error(CREATE_USER_ERROR.getMsg(),e); @@ -127,13 +128,14 @@ public class UsersController extends BaseController{ @RequestParam(value = "id") int id, @RequestParam(value = "userName") String userName, @RequestParam(value = "userPassword") String userPassword, + @RequestParam(value = "queue") String queue, @RequestParam(value = "email") String email, @RequestParam(value = "tenantId") int tenantId, @RequestParam(value = "phone", required = false) String phone) { - logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, proxyUsers: {}", - loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone); + logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}", + loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone,queue); try { - Map result = usersService.updateUser(id,userName,userPassword,email,tenantId,phone); + Map result = usersService.updateUser(id,userName,userPassword,email,tenantId,phone,queue); return returnDataList(result); }catch (Exception e){ logger.error(UPDATE_USER_ERROR.getMsg(),e); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java index 8bf815ca87..82e1a850f5 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java @@ -87,7 +87,8 @@ public class UsersService extends BaseService { String userPassword, String email, int tenantId, - String phone) throws Exception { + String phone, + String queue) throws Exception { Map result = new HashMap<>(5); result = CheckUtils.checkUserParams(userName, userPassword, email, phone); @@ -114,6 +115,7 @@ public class UsersService extends BaseService { user.setUserType(UserType.GENERAL_USER); user.setCreateTime(now); user.setUpdateTime(now); + user.setQueue(queue); // save user userMapper.insert(user); @@ -194,7 +196,13 @@ public class UsersService extends BaseService { * @param phone * @return */ - public Map updateUser(int userId, String userName, String userPassword, String email, int tenantId, String phone) throws Exception { + public Map updateUser(int userId, + String userName, + String userPassword, + String email, + int tenantId, + String phone, + String queue) throws Exception { Map result = new HashMap<>(5); result.put(Constants.STATUS, false); @@ -218,6 +226,7 @@ public class UsersService extends BaseService { if (StringUtils.isNotEmpty(email)) { user.setEmail(email); } + user.setQueue(queue); user.setPhone(phone); user.setUpdateTime(now); diff --git a/escheduler-dao/readme.txt b/escheduler-dao/readme.txt new file mode 100644 index 0000000000..d3659dc68a --- /dev/null +++ b/escheduler-dao/readme.txt @@ -0,0 +1 @@ +alter table t_escheduler_user add queue varchar(64); \ No newline at end of file diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index cf217c95aa..e6156a0f4c 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -59,7 +59,7 @@ public class ProcessDao extends AbstractBaseDao { ExecutionStatus.READY_STOP.ordinal()}; @Autowired - private ProjectMapper projectMapper; + private UserMapper userMapper; @Autowired private ProcessDefinitionMapper processDefineMapper; @@ -102,7 +102,7 @@ public class ProcessDao extends AbstractBaseDao { */ @Override protected void init() { - projectMapper = getMapper(ProjectMapper.class); + userMapper=getMapper(UserMapper.class); processDefineMapper = getMapper(ProcessDefinitionMapper.class); processInstanceMapper = getMapper(ProcessInstanceMapper.class); dataSourceMapper = getMapper(DataSourceMapper.class); @@ -261,7 +261,7 @@ public class ProcessDao extends AbstractBaseDao { public ProcessInstance findProcessInstanceByScheduleTime(int defineId, Date scheduleTime){ return processInstanceMapper.queryByScheduleTime(defineId, - DateUtils.dateToString(scheduleTime), 0,null, null); + DateUtils.dateToString(scheduleTime), 0, null, null); } /** @@ -1210,7 +1210,7 @@ public class ProcessDao extends AbstractBaseDao { public int updateProcessInstance(Integer processInstanceId, String processJson, String globalParams, Date scheduleTime, Flag flag, String locations, String connects){ - return processInstanceMapper.updateProcessInstance( processInstanceId, processJson, + return processInstanceMapper.updateProcessInstance(processInstanceId, processJson, globalParams, scheduleTime, locations, connects, flag); } @@ -1554,4 +1554,16 @@ public class ProcessDao extends AbstractBaseDao { DateUtils.dateToString(dateInterval.getEndTime()), stateArray); } + + /** + * query user queue by process instance id + * @param processInstanceId + * @return + */ + public String queryQueueByProcessInstanceId(int processInstanceId){ + return userMapper.queryQueueByProcessInstanceId(processInstanceId); + } + + + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java index d33fb9ebfb..cd74aa9970 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java @@ -222,4 +222,13 @@ public interface UserMapper { }) @SelectProvider(type = UserMapperProvider.class, method = "queryTenantCodeByUserId") User queryTenantCodeByUserId(@Param("userId") int userId); + + + /** + * query user queue by process instance id + * @param processInstanceId + * @return + */ + @SelectProvider(type = UserMapperProvider.class, method = "queryQueueByProcessInstanceId") + String queryQueueByProcessInstanceId(@Param("processInstanceId") int processInstanceId); } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java index d1cfc34ad9..d3f3f677bd 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java @@ -47,6 +47,7 @@ public class UserMapperProvider { VALUES("`phone`", "#{user.phone}"); VALUES("`user_type`", EnumFieldUtil.genFieldStr("user.userType", UserType.class)); VALUES("`tenant_id`", "#{user.tenantId}"); + VALUES("`queue`", "#{user.queue}"); VALUES("`create_time`", "#{user.createTime}"); VALUES("`update_time`", "#{user.updateTime}"); } @@ -86,6 +87,7 @@ public class UserMapperProvider { SET("`phone`=#{user.phone}"); SET("`user_type`="+EnumFieldUtil.genFieldStr("user.userType", UserType.class)); SET("`tenant_id`=#{user.tenantId}"); + SET("`queue`=#{user.queue}"); SET("`create_time`=#{user.createTime}"); SET("`update_time`=#{user.updateTime}"); @@ -247,4 +249,20 @@ public class UserMapperProvider { }.toString(); } + + /** + * query tenant code by user id + * @param parameter + * @return + */ + public String queryQueueByProcessInstanceId(Map parameter) { + return new SQL() { + { + SELECT("queue"); + FROM(TABLE_NAME + " u,t_escheduler_process_instance p"); + WHERE("u.id = p.executor_id and p.id=#{processInstanceId}"); + } + }.toString(); + } + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java index 7fda405ef2..6f831fbd96 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java @@ -79,6 +79,12 @@ public class User { * alert group */ private String alertGroup; + + /** + * user specified queue + */ + private String queue; + /** * create time */ @@ -194,23 +200,12 @@ public class User { this.tenantCode = tenantCode; } - @Override - public String toString() { - return "User{" + - "id=" + id + - ", userName='" + userName + '\'' + - ", userPassword='" + userPassword + '\'' + - ", email='" + email + '\'' + - ", phone='" + phone + '\'' + - ", userType=" + userType + - ", tenantId=" + tenantId + - ", tenantCode='" + tenantCode + '\'' + - ", tenantName='" + tenantName + '\'' + - ", queueName='" + queueName + '\'' + - ", alertGroup='" + alertGroup + '\'' + - ", createTime=" + createTime + - ", updateTime=" + updateTime + - '}'; + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; } @Override @@ -237,4 +232,24 @@ public class User { result = 31 * result + userName.hashCode(); return result; } + + @Override + public String toString() { + return "User{" + + "id=" + id + + ", userName='" + userName + '\'' + + ", userPassword='" + userPassword + '\'' + + ", email='" + email + '\'' + + ", phone='" + phone + '\'' + + ", userType=" + userType + + ", tenantId=" + tenantId + + ", tenantCode='" + tenantCode + '\'' + + ", tenantName='" + tenantName + '\'' + + ", queueName='" + queueName + '\'' + + ", alertGroup='" + alertGroup + '\'' + + ", queue='" + queue + '\'' + + ", createTime=" + createTime + + ", updateTime=" + updateTime + + '}'; + } } diff --git a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java index 94ae0ca175..c8e5584236 100644 --- a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java +++ b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java @@ -60,4 +60,10 @@ public class UserMapperTest { } + @Test + public void queryQueueByProcessInstanceId(){ + String queue = userMapper.queryQueueByProcessInstanceId(41388); + Assert.assertEquals(queue, "ait"); + } + } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index 016607c79e..f381b9ef66 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -36,6 +36,7 @@ import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.TaskManager; import cn.escheduler.server.worker.task.TaskProps; import com.alibaba.fastjson.JSONObject; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,12 +155,18 @@ public class TaskScheduleThread implements Callable { taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); + String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); + taskProps.setScheduleTime(processInstance.getScheduleTime()); taskProps.setNodeName(taskInstance.getName()); taskProps.setTaskInstId(taskInstance.getId()); taskProps.setEnvFile(CommonUtils.getSystemEnvPath()); // set queue - taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); + if (StringUtils.isEmpty(queue)){ + taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); + }else { + taskProps.setQueue(queue); + } taskProps.setTaskStartTime(taskInstance.getStartTime()); taskProps.setDefinedParams(allParamMap); @@ -188,7 +195,7 @@ public class TaskScheduleThread implements Callable { task.handle(); - logger.info("task : {} exit status code : {}",taskProps.getTaskAppId(),task.getExitStatusCode()); + logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ status = ExecutionStatus.SUCCESS;