Browse Source

user-specified queue function development

pull/2/head
qiaozhanwei 5 years ago
parent
commit
2888907376
  1. 14
      escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java
  2. 13
      escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
  3. 1
      escheduler-dao/readme.txt
  4. 20
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  5. 9
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java
  6. 18
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java
  7. 49
      escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java
  8. 6
      escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java
  9. 9
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

14
escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java

@ -64,13 +64,14 @@ public class UsersController extends BaseController{
@RequestParam(value = "userName") String userName, @RequestParam(value = "userName") String userName,
@RequestParam(value = "userPassword") String userPassword, @RequestParam(value = "userPassword") String userPassword,
@RequestParam(value = "tenantId") int tenantId, @RequestParam(value = "tenantId") int tenantId,
@RequestParam(value = "queue") String queue,
@RequestParam(value = "email") String email, @RequestParam(value = "email") String email,
@RequestParam(value = "phone", required = false) String phone) { @RequestParam(value = "phone", required = false) String phone) {
logger.info("login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, proxyUsers: {}", logger.info("login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone); loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone,queue);
try { try {
Map<String, Object> result = usersService.createUser(loginUser, userName, userPassword, email, tenantId, phone); Map<String, Object> result = usersService.createUser(loginUser, userName, userPassword,email,tenantId, phone,queue);
return returnDataList(result); return returnDataList(result);
}catch (Exception e){ }catch (Exception e){
logger.error(CREATE_USER_ERROR.getMsg(),e); logger.error(CREATE_USER_ERROR.getMsg(),e);
@ -127,13 +128,14 @@ public class UsersController extends BaseController{
@RequestParam(value = "id") int id, @RequestParam(value = "id") int id,
@RequestParam(value = "userName") String userName, @RequestParam(value = "userName") String userName,
@RequestParam(value = "userPassword") String userPassword, @RequestParam(value = "userPassword") String userPassword,
@RequestParam(value = "queue") String queue,
@RequestParam(value = "email") String email, @RequestParam(value = "email") String email,
@RequestParam(value = "tenantId") int tenantId, @RequestParam(value = "tenantId") int tenantId,
@RequestParam(value = "phone", required = false) String phone) { @RequestParam(value = "phone", required = false) String phone) {
logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, proxyUsers: {}", logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone); loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone,queue);
try { try {
Map<String, Object> result = usersService.updateUser(id,userName,userPassword,email,tenantId,phone); Map<String, Object> result = usersService.updateUser(id,userName,userPassword,email,tenantId,phone,queue);
return returnDataList(result); return returnDataList(result);
}catch (Exception e){ }catch (Exception e){
logger.error(UPDATE_USER_ERROR.getMsg(),e); logger.error(UPDATE_USER_ERROR.getMsg(),e);

13
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java

@ -87,7 +87,8 @@ public class UsersService extends BaseService {
String userPassword, String userPassword,
String email, String email,
int tenantId, int tenantId,
String phone) throws Exception { String phone,
String queue) throws Exception {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
result = CheckUtils.checkUserParams(userName, userPassword, email, phone); result = CheckUtils.checkUserParams(userName, userPassword, email, phone);
@ -114,6 +115,7 @@ public class UsersService extends BaseService {
user.setUserType(UserType.GENERAL_USER); user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(now); user.setCreateTime(now);
user.setUpdateTime(now); user.setUpdateTime(now);
user.setQueue(queue);
// save user // save user
userMapper.insert(user); userMapper.insert(user);
@ -194,7 +196,13 @@ public class UsersService extends BaseService {
* @param phone * @param phone
* @return * @return
*/ */
public Map<String, Object> updateUser(int userId, String userName, String userPassword, String email, int tenantId, String phone) throws Exception { public Map<String, Object> updateUser(int userId,
String userName,
String userPassword,
String email,
int tenantId,
String phone,
String queue) throws Exception {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
result.put(Constants.STATUS, false); result.put(Constants.STATUS, false);
@ -218,6 +226,7 @@ public class UsersService extends BaseService {
if (StringUtils.isNotEmpty(email)) { if (StringUtils.isNotEmpty(email)) {
user.setEmail(email); user.setEmail(email);
} }
user.setQueue(queue);
user.setPhone(phone); user.setPhone(phone);
user.setUpdateTime(now); user.setUpdateTime(now);

1
escheduler-dao/readme.txt

@ -0,0 +1 @@
alter table t_escheduler_user add queue varchar(64);

20
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -59,7 +59,7 @@ public class ProcessDao extends AbstractBaseDao {
ExecutionStatus.READY_STOP.ordinal()}; ExecutionStatus.READY_STOP.ordinal()};
@Autowired @Autowired
private ProjectMapper projectMapper; private UserMapper userMapper;
@Autowired @Autowired
private ProcessDefinitionMapper processDefineMapper; private ProcessDefinitionMapper processDefineMapper;
@ -102,7 +102,7 @@ public class ProcessDao extends AbstractBaseDao {
*/ */
@Override @Override
protected void init() { protected void init() {
projectMapper = getMapper(ProjectMapper.class); userMapper=getMapper(UserMapper.class);
processDefineMapper = getMapper(ProcessDefinitionMapper.class); processDefineMapper = getMapper(ProcessDefinitionMapper.class);
processInstanceMapper = getMapper(ProcessInstanceMapper.class); processInstanceMapper = getMapper(ProcessInstanceMapper.class);
dataSourceMapper = getMapper(DataSourceMapper.class); dataSourceMapper = getMapper(DataSourceMapper.class);
@ -261,7 +261,7 @@ public class ProcessDao extends AbstractBaseDao {
public ProcessInstance findProcessInstanceByScheduleTime(int defineId, Date scheduleTime){ public ProcessInstance findProcessInstanceByScheduleTime(int defineId, Date scheduleTime){
return processInstanceMapper.queryByScheduleTime(defineId, return processInstanceMapper.queryByScheduleTime(defineId,
DateUtils.dateToString(scheduleTime), 0,null, null); DateUtils.dateToString(scheduleTime), 0, null, null);
} }
/** /**
@ -1210,7 +1210,7 @@ public class ProcessDao extends AbstractBaseDao {
public int updateProcessInstance(Integer processInstanceId, String processJson, public int updateProcessInstance(Integer processInstanceId, String processJson,
String globalParams, Date scheduleTime, Flag flag, String globalParams, Date scheduleTime, Flag flag,
String locations, String connects){ String locations, String connects){
return processInstanceMapper.updateProcessInstance( processInstanceId, processJson, return processInstanceMapper.updateProcessInstance(processInstanceId, processJson,
globalParams, scheduleTime, locations, connects, flag); globalParams, scheduleTime, locations, connects, flag);
} }
@ -1554,4 +1554,16 @@ public class ProcessDao extends AbstractBaseDao {
DateUtils.dateToString(dateInterval.getEndTime()), DateUtils.dateToString(dateInterval.getEndTime()),
stateArray); stateArray);
} }
/**
* query user queue by process instance id
* @param processInstanceId
* @return
*/
public String queryQueueByProcessInstanceId(int processInstanceId){
return userMapper.queryQueueByProcessInstanceId(processInstanceId);
}
} }

9
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java

@ -222,4 +222,13 @@ public interface UserMapper {
}) })
@SelectProvider(type = UserMapperProvider.class, method = "queryTenantCodeByUserId") @SelectProvider(type = UserMapperProvider.class, method = "queryTenantCodeByUserId")
User queryTenantCodeByUserId(@Param("userId") int userId); User queryTenantCodeByUserId(@Param("userId") int userId);
/**
* query user queue by process instance id
* @param processInstanceId
* @return
*/
@SelectProvider(type = UserMapperProvider.class, method = "queryQueueByProcessInstanceId")
String queryQueueByProcessInstanceId(@Param("processInstanceId") int processInstanceId);
} }

18
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java

@ -47,6 +47,7 @@ public class UserMapperProvider {
VALUES("`phone`", "#{user.phone}"); VALUES("`phone`", "#{user.phone}");
VALUES("`user_type`", EnumFieldUtil.genFieldStr("user.userType", UserType.class)); VALUES("`user_type`", EnumFieldUtil.genFieldStr("user.userType", UserType.class));
VALUES("`tenant_id`", "#{user.tenantId}"); VALUES("`tenant_id`", "#{user.tenantId}");
VALUES("`queue`", "#{user.queue}");
VALUES("`create_time`", "#{user.createTime}"); VALUES("`create_time`", "#{user.createTime}");
VALUES("`update_time`", "#{user.updateTime}"); VALUES("`update_time`", "#{user.updateTime}");
} }
@ -86,6 +87,7 @@ public class UserMapperProvider {
SET("`phone`=#{user.phone}"); SET("`phone`=#{user.phone}");
SET("`user_type`="+EnumFieldUtil.genFieldStr("user.userType", UserType.class)); SET("`user_type`="+EnumFieldUtil.genFieldStr("user.userType", UserType.class));
SET("`tenant_id`=#{user.tenantId}"); SET("`tenant_id`=#{user.tenantId}");
SET("`queue`=#{user.queue}");
SET("`create_time`=#{user.createTime}"); SET("`create_time`=#{user.createTime}");
SET("`update_time`=#{user.updateTime}"); SET("`update_time`=#{user.updateTime}");
@ -247,4 +249,20 @@ public class UserMapperProvider {
}.toString(); }.toString();
} }
/**
* query tenant code by user id
* @param parameter
* @return
*/
public String queryQueueByProcessInstanceId(Map<String, Object> parameter) {
return new SQL() {
{
SELECT("queue");
FROM(TABLE_NAME + " u,t_escheduler_process_instance p");
WHERE("u.id = p.executor_id and p.id=#{processInstanceId}");
}
}.toString();
}
} }

49
escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java

@ -79,6 +79,12 @@ public class User {
* alert group * alert group
*/ */
private String alertGroup; private String alertGroup;
/**
* user specified queue
*/
private String queue;
/** /**
* create time * create time
*/ */
@ -194,23 +200,12 @@ public class User {
this.tenantCode = tenantCode; this.tenantCode = tenantCode;
} }
@Override public String getQueue() {
public String toString() { return queue;
return "User{" + }
"id=" + id +
", userName='" + userName + '\'' + public void setQueue(String queue) {
", userPassword='" + userPassword + '\'' + this.queue = queue;
", email='" + email + '\'' +
", phone='" + phone + '\'' +
", userType=" + userType +
", tenantId=" + tenantId +
", tenantCode='" + tenantCode + '\'' +
", tenantName='" + tenantName + '\'' +
", queueName='" + queueName + '\'' +
", alertGroup='" + alertGroup + '\'' +
", createTime=" + createTime +
", updateTime=" + updateTime +
'}';
} }
@Override @Override
@ -237,4 +232,24 @@ public class User {
result = 31 * result + userName.hashCode(); result = 31 * result + userName.hashCode();
return result; return result;
} }
@Override
public String toString() {
return "User{" +
"id=" + id +
", userName='" + userName + '\'' +
", userPassword='" + userPassword + '\'' +
", email='" + email + '\'' +
", phone='" + phone + '\'' +
", userType=" + userType +
", tenantId=" + tenantId +
", tenantCode='" + tenantCode + '\'' +
", tenantName='" + tenantName + '\'' +
", queueName='" + queueName + '\'' +
", alertGroup='" + alertGroup + '\'' +
", queue='" + queue + '\'' +
", createTime=" + createTime +
", updateTime=" + updateTime +
'}';
}
} }

6
escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java

@ -60,4 +60,10 @@ public class UserMapperTest {
} }
@Test
public void queryQueueByProcessInstanceId(){
String queue = userMapper.queryQueueByProcessInstanceId(41388);
Assert.assertEquals(queue, "ait");
}
} }

9
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@ -36,6 +36,7 @@ import cn.escheduler.server.worker.task.AbstractTask;
import cn.escheduler.server.worker.task.TaskManager; import cn.escheduler.server.worker.task.TaskManager;
import cn.escheduler.server.worker.task.TaskProps; import cn.escheduler.server.worker.task.TaskProps;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -154,12 +155,18 @@ public class TaskScheduleThread implements Callable<Boolean> {
taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId());
taskProps.setScheduleTime(processInstance.getScheduleTime()); taskProps.setScheduleTime(processInstance.getScheduleTime());
taskProps.setNodeName(taskInstance.getName()); taskProps.setNodeName(taskInstance.getName());
taskProps.setTaskInstId(taskInstance.getId()); taskProps.setTaskInstId(taskInstance.getId());
taskProps.setEnvFile(CommonUtils.getSystemEnvPath()); taskProps.setEnvFile(CommonUtils.getSystemEnvPath());
// set queue // set queue
if (StringUtils.isEmpty(queue)){
taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); taskProps.setQueue(taskInstance.getProcessInstance().getQueue());
}else {
taskProps.setQueue(queue);
}
taskProps.setTaskStartTime(taskInstance.getStartTime()); taskProps.setTaskStartTime(taskInstance.getStartTime());
taskProps.setDefinedParams(allParamMap); taskProps.setDefinedParams(allParamMap);
@ -188,7 +195,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
task.handle(); task.handle();
logger.info("task : {} exit status code : {}",taskProps.getTaskAppId(),task.getExitStatusCode()); logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode());
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
status = ExecutionStatus.SUCCESS; status = ExecutionStatus.SUCCESS;

Loading…
Cancel
Save