Browse Source

command and task monitor dev

pull/2/head
qiaozhanwei 6 years ago
parent
commit
fa804e7abe
  1. 56
      escheduler-api/src/main/java/cn/escheduler/api/controller/DataAnalysisController.java
  2. 5
      escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java
  3. 11
      escheduler-api/src/main/java/cn/escheduler/api/interceptor/LoginHandlerInterceptor.java
  4. 175
      escheduler-api/src/main/java/cn/escheduler/api/service/DataAnalysisService.java
  5. 16
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapper.java
  6. 29
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapperProvider.java
  7. 14
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapper.java
  8. 28
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java
  9. 14
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapper.java
  10. 38
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java
  11. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java
  12. 8
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ExecuteStatusCount.java

56
escheduler-api/src/main/java/cn/escheduler/api/controller/DataAnalysisController.java

@ -57,8 +57,7 @@ public class DataAnalysisController extends BaseController{
public Result countTaskState(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value="startDate", required=false) String startDate,
@RequestParam(value="endDate", required=false) String endDate,
@RequestParam(value="projectId", required=false, defaultValue = "0") int projectId
){
@RequestParam(value="projectId", required=false, defaultValue = "0") int projectId){
try{
logger.info("count task state, user:{}, start date: {}, end date:{}, project id {}",
loginUser.getUserName(), startDate, endDate, projectId);
@ -82,8 +81,7 @@ public class DataAnalysisController extends BaseController{
public Result countProcessInstanceState(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value="startDate", required=false) String startDate,
@RequestParam(value="endDate", required=false) String endDate,
@RequestParam(value="projectId", required=false, defaultValue = "0") int projectId
){
@RequestParam(value="projectId", required=false, defaultValue = "0") int projectId){
try{
logger.info("count process instance state, user:{}, start date: {}, end date:{}, project id",
loginUser.getUserName(), startDate, endDate, projectId);
@ -105,8 +103,7 @@ public class DataAnalysisController extends BaseController{
@GetMapping(value="/define-user-count")
@ResponseStatus(HttpStatus.OK)
public Result countDefinitionByUser(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value="projectId", required=false, defaultValue = "0") int projectId
){
@RequestParam(value="projectId", required=false, defaultValue = "0") int projectId){
try{
logger.info("count process definition , user:{}, project id",
loginUser.getUserName(), projectId);
@ -119,4 +116,51 @@ public class DataAnalysisController extends BaseController{
}
/**
* statistical command status data
*
* @param loginUser
* @param projectId
* @return
*/
@GetMapping(value="/command-state-count")
@ResponseStatus(HttpStatus.OK)
public Result countCommandState(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value="startDate", required=false) String startDate,
@RequestParam(value="endDate", required=false) String endDate,
@RequestParam(value="projectId", required=false, defaultValue = "0") int projectId){
try{
logger.info("count command state, user:{}, start date: {}, end date:{}, project id {}",
loginUser.getUserName(), startDate, endDate, projectId);
Map<String, Object> result = dataAnalysisService.countCommandState(loginUser, projectId, startDate, endDate);
return returnDataList(result);
}catch (Exception e){
logger.error(COMMAND_STATE_COUNT_ERROR.getMsg(),e);
return error(COMMAND_STATE_COUNT_ERROR.getCode(), COMMAND_STATE_COUNT_ERROR.getMsg());
}
}
/**
* queue count
*
* @param loginUser
* @param projectId
* @return
*/
@GetMapping(value="/queue-count")
@ResponseStatus(HttpStatus.OK)
public Result countQueueState(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value="projectId", required=false, defaultValue = "0") int projectId){
try{
logger.info("count command state, user:{}, start date: {}, end date:{}, project id {}",
loginUser.getUserName(), projectId);
Map<String, Object> result = dataAnalysisService.countQueueState(loginUser, projectId);
return returnDataList(result);
}catch (Exception e){
logger.error(QUEUE_COUNT_ERROR.getMsg(),e);
return error(QUEUE_COUNT_ERROR.getCode(), QUEUE_COUNT_ERROR.getMsg());
}
}
}

5
escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java

@ -205,6 +205,11 @@ public enum Status {
GENERATE_TOKEN_ERROR(70002,"generate token error"),
QUERY_ACCESSTOKEN_LIST_PAGING_ERROR(70003,"query access token list paging error"),
COMMAND_STATE_COUNT_ERROR(80001,"task instance state count error"),
QUEUE_COUNT_ERROR(90001,"queue count error"),
;
private int code;

11
escheduler-api/src/main/java/cn/escheduler/api/interceptor/LoginHandlerInterceptor.java

@ -79,9 +79,6 @@ public class LoginHandlerInterceptor implements HandlerInterceptor {
//get user object from session
user = userMapper.queryById(session.getUserId());
}else {
user = userMapper.queryUserByToken(token);
}
// if user is null
if (user == null) {
@ -89,6 +86,14 @@ public class LoginHandlerInterceptor implements HandlerInterceptor {
logger.info("user does not exist");
return false;
}
}else {
user = userMapper.queryUserByToken(token);
if (user == null) {
response.setStatus(HttpStatus.SC_UNAUTHORIZED);
logger.info("user token has expired");
return false;
}
}
request.setAttribute(Constants.SESSION_USER, user);
return true;
}

175
escheduler-api/src/main/java/cn/escheduler/api/service/DataAnalysisService.java

@ -21,15 +21,17 @@ import cn.escheduler.api.dto.DefineUserDto;
import cn.escheduler.api.dto.TaskCountDto;
import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.UserType;
import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.dao.mapper.ProcessDefinitionMapper;
import cn.escheduler.dao.mapper.ProcessInstanceMapper;
import cn.escheduler.dao.mapper.ProjectMapper;
import cn.escheduler.dao.mapper.TaskInstanceMapper;
import cn.escheduler.dao.mapper.*;
import cn.escheduler.dao.model.DefinitionGroupByUser;
import cn.escheduler.dao.model.ExecuteStatusCount;
import cn.escheduler.dao.model.Project;
import cn.escheduler.dao.model.User;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -55,15 +57,21 @@ public class DataAnalysisService {
@Autowired
ProjectService projectService;
@Autowired
TaskInstanceMapper taskInstanceMapper;
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Autowired
ProcessDefinitionMapper processDefinitionMapper;
@Autowired
CommandMapper commandMapper;
@Autowired
ErrorCommandMapper errorCommandMapper;
@Autowired
TaskInstanceMapper taskInstanceMapper;
/**
* statistical task instance status data
*
@ -206,4 +214,157 @@ public class DataAnalysisService {
}
return false;
}
/**
* statistical command status data
*
* @param loginUser
* @param projectId
* @param startDate
* @param endDate
* @return
*/
public Map<String, Object> countCommandState(User loginUser, int projectId, String startDate, String endDate) {
Map<String, Object> result = new HashMap<>(5);
if(projectId != 0){
Project project = projectMapper.queryById(projectId);
result = projectService.checkProjectAndAuth(loginUser, project, String.valueOf(projectId));
if (getResultStatus(result)){
return result;
}
}
/**
* find all the task lists in the project under the user
* statistics based on task status execution, failure, completion, wait, total
*/
Date start = null;
Date end = null;
try {
start = DateUtils.getScheduleDate(startDate);
end = DateUtils.getScheduleDate(endDate);
} catch (Exception e) {
logger.error(e.getMessage(),e);
putErrorRequestParamsMsg(result);
return result;
}
// count command state
List<ExecuteStatusCount> commandStateCounts =
commandMapper.countCommandState(loginUser.getId(),
loginUser.getUserType(), start, end, projectId);
// count error command state
List<ExecuteStatusCount> errorCommandStateCounts =
errorCommandMapper.countCommandState(loginUser.getId(),
loginUser.getUserType(), start, end, projectId);
//
Map<ExecutionStatus,Map<String,Integer>> dataMap = new HashMap<>();
Map<String,Integer> commonCommand = new HashMap<>();
commonCommand.put("commandState",0);
commonCommand.put("errorCommandState",0);
// init data map
dataMap.put(ExecutionStatus.SUBMITTED_SUCCESS,commonCommand);
dataMap.put(ExecutionStatus.RUNNING_EXEUTION,commonCommand);
dataMap.put(ExecutionStatus.READY_PAUSE,commonCommand);
dataMap.put(ExecutionStatus.PAUSE,commonCommand);
dataMap.put(ExecutionStatus.READY_STOP,commonCommand);
dataMap.put(ExecutionStatus.STOP,commonCommand);
dataMap.put(ExecutionStatus.FAILURE,commonCommand);
dataMap.put(ExecutionStatus.SUCCESS,commonCommand);
dataMap.put(ExecutionStatus.NEED_FAULT_TOLERANCE,commonCommand);
dataMap.put(ExecutionStatus.KILL,commonCommand);
dataMap.put(ExecutionStatus.WAITTING_THREAD,commonCommand);
dataMap.put(ExecutionStatus.WAITTING_DEPEND,commonCommand);
// put command state
for (ExecuteStatusCount executeStatusCount : commandStateCounts){
Map<String,Integer> commandStateCountsMap = new HashMap<>(dataMap.get(executeStatusCount.getExecutionStatus()));
commandStateCountsMap.put("commandState", executeStatusCount.getCount());
dataMap.put(executeStatusCount.getExecutionStatus(),commandStateCountsMap);
}
// put error command state
for (ExecuteStatusCount errorExecutionStatus : errorCommandStateCounts){
Map<String,Integer> errorCommandStateCountsMap = new HashMap<>(dataMap.get(errorExecutionStatus.getExecutionStatus()));
errorCommandStateCountsMap.put("errorCommandState",errorExecutionStatus.getCount());
dataMap.put(errorExecutionStatus.getExecutionStatus(),errorCommandStateCountsMap);
}
result.put(Constants.DATA_LIST, dataMap);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* count queue state
* @param loginUser
* @param projectId
* @return
*/
public Map<String, Object> countQueueState(User loginUser, int projectId) {
Map<String, Object> result = new HashMap<>(5);
if(projectId != 0){
Project project = projectMapper.queryById(projectId);
result = projectService.checkProjectAndAuth(loginUser, project, String.valueOf(projectId));
if (getResultStatus(result)){
return result;
}
}
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
List<String> tasksQueueList = tasksQueue.getAllTasks(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE);
List<String> tasksKillList = tasksQueue.getAllTasks(cn.escheduler.common.Constants.SCHEDULER_TASKS_KILL);
Map<String,Integer> dataMap = new HashMap<>();
if (loginUser.getUserType() == UserType.ADMIN_USER){
dataMap.put("taskQueue",tasksQueueList.size());
dataMap.put("taskKill",tasksKillList.size());
result.put(Constants.DATA_LIST, dataMap);
putMsg(result, Status.SUCCESS);
return result;
}
int[] tasksQueueIds = new int[tasksQueueList.size()];
int[] tasksKillIds = new int[tasksKillList.size()];
int i =0;
for (String taskQueueStr : tasksQueueList){
if (StringUtils.isNotEmpty(taskQueueStr)){
String[] splits = taskQueueStr.split("_");
if (splits.length == 4){
tasksQueueIds[i++]=Integer.parseInt(splits[3]);
}
}
}
i = 0;
for (String taskKillStr : tasksKillList){
if (StringUtils.isNotEmpty(taskKillStr)){
String[] splits = taskKillStr.split("_");
if (splits.length == 2){
tasksKillIds[i++]=Integer.parseInt(splits[1]);
}
}
}
Integer taskQueueCount = taskInstanceMapper.countTask(loginUser.getId(),loginUser.getUserType(),projectId, tasksQueueIds);
Integer taskKillCount = taskInstanceMapper.countTask(loginUser.getId(),loginUser.getUserType(),projectId, tasksQueueIds);
dataMap.put("taskQueue",taskQueueCount);
dataMap.put("taskKill",taskKillCount);
result.put(Constants.DATA_LIST, dataMap);
putMsg(result, Status.SUCCESS);
return result;
}
}

16
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapper.java

@ -18,12 +18,15 @@ package cn.escheduler.dao.mapper;
import cn.escheduler.common.enums.*;
import cn.escheduler.dao.model.Command;
import cn.escheduler.dao.model.ExecuteStatusCount;
import org.apache.ibatis.annotations.*;
import org.apache.ibatis.type.EnumOrdinalTypeHandler;
import org.apache.ibatis.type.JdbcType;
import java.sql.Timestamp;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* command mapper
@ -103,4 +106,17 @@ public interface CommandMapper {
@SelectProvider(type = CommandMapperProvider.class, method = "queryAllCommand")
List<Command> queryAllCommand();
@Results(value = {
@Result(property = "state", column = "state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ExecutionStatus.class, jdbcType = JdbcType.TINYINT),
@Result(property = "count", column = "count", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
})
@SelectProvider(type = CommandMapperProvider.class, method = "countCommandState")
List<ExecuteStatusCount> countCommandState(
@Param("userId") int userId,
@Param("userType") UserType userType,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("projectId") int projectId);
}

29
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapperProvider.java

@ -139,6 +139,35 @@ public class CommandMapperProvider {
}.toString();
}
/**
*
* count command type
* @param parameter
* @return
*/
public String countCommandState(Map<String, Object> parameter){
return new SQL(){
{
SELECT ("command_type as state,COUNT(*) AS count");
FROM(TABLE_NAME + " cmd,t_escheduler_process_definition process");
WHERE("cmd.process_definition_id = process.id");
if(parameter.get("projectId") != null && (int)parameter.get("projectId") != 0){
WHERE( "process.project_id = #{projectId} ");
}else{
if(parameter.get("userType") != null && String.valueOf(parameter.get("userType")) == "GENERAL_USER") {
AND();
WHERE("process.project_id in (select id as project_id from t_escheduler_project tp where tp.user_id= #{userId} " +
"union select project_id from t_escheduler_relation_project_user tr where tr.user_id= #{userId} )");
}
}
WHERE("cmd.start_time >= #{startTime} and cmd.update_time <= #{endTime}");
GROUP_BY("cmd.command_type");
}
}.toString();
}
}

14
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapper.java

@ -19,11 +19,13 @@ package cn.escheduler.dao.mapper;
import cn.escheduler.common.enums.*;
import cn.escheduler.dao.model.Command;
import cn.escheduler.dao.model.ErrorCommand;
import cn.escheduler.dao.model.ExecuteStatusCount;
import org.apache.ibatis.annotations.*;
import org.apache.ibatis.type.EnumOrdinalTypeHandler;
import org.apache.ibatis.type.JdbcType;
import java.sql.Timestamp;
import java.util.Date;
import java.util.List;
/**
@ -41,5 +43,17 @@ public interface ErrorCommandMapper {
@SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "errorCommand.id", before = false, resultType = int.class)
int insert(@Param("errorCommand") ErrorCommand errorCommand);
@Results(value = {
@Result(property = "state", column = "state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ExecutionStatus.class, jdbcType = JdbcType.TINYINT),
@Result(property = "count", column = "count", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
})
@SelectProvider(type = ErrorCommandMapperProvider.class, method = "countCommandState")
List<ExecuteStatusCount> countCommandState(
@Param("userId") int userId,
@Param("userType") UserType userType,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("projectId") int projectId);
}

28
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java

@ -38,4 +38,32 @@ public class ErrorCommandMapperProvider {
}
}.toString();
}
/**
*
* count command type
* @param parameter
* @return
*/
public String countCommandState(Map<String, Object> parameter){
return new SQL(){
{
SELECT("command_type as state,COUNT(*) AS count");
FROM(TABLE_NAME + " cmd,t_escheduler_process_definition process");
WHERE("cmd.process_definition_id = process.id");
if(parameter.get("projectId") != null && (int)parameter.get("projectId") != 0){
WHERE( "process.project_id = #{projectId} ");
}else{
if(parameter.get("userType") != null && String.valueOf(parameter.get("userType")) == "GENERAL_USER") {
AND();
WHERE("process.project_id in (select id as project_id from t_escheduler_project tp where tp.user_id= #{userId} " +
"union select project_id from t_escheduler_relation_project_user tr where tr.user_id= #{userId} )");
}
}
WHERE("cmd.start_time >= #{startTime} and cmd.update_time <= #{endTime}");
GROUP_BY("cmd.command_type");
}
}.toString();
}
}

14
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapper.java

@ -304,4 +304,18 @@ public interface TaskInstanceMapper {
@SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryByInstanceIdAndName")
TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int processInstanceId,
@Param("name") String name);
/**
* count task
* @param userId
* @param userType
* @param projectId
* @return
*/
@SelectProvider(type = TaskInstanceMapperProvider.class, method = "countTask")
Integer countTask(@Param("userId") int userId,
@Param("userType") UserType userType,
@Param("projectId") int projectId,
@Param("taskIds") int[] taskIds);
}

38
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java

@ -404,4 +404,42 @@ public class TaskInstanceMapperProvider {
}
/**
*
* count task
* @param parameter
* @return
*/
public String countTask(Map<String, Object> parameter){
StringBuffer taskIdsStr = new StringBuffer();
int[] stateArray = (int[]) parameter.get("taskIds");
for(int i=0;i<stateArray.length;i++){
taskIdsStr.append(stateArray[i]);
if(i<stateArray.length-1){
taskIdsStr.append(",");
}
}
return new SQL(){
{
SELECT("count(1) as count");
FROM(TABLE_NAME + " task,t_escheduler_process_definition process");
WHERE("task.process_definition_id=process.id");
if(parameter.get("projectId") != null && (int)parameter.get("projectId") != 0){
WHERE( "process.project_id = #{projectId} ");
}else{
if(parameter.get("userType") != null && String.valueOf(parameter.get("userType")) == "GENERAL_USER") {
AND();
WHERE("process.project_id in (select id as project_id from t_escheduler_project tp where tp.user_id= #{userId} " +
"union select project_id from t_escheduler_relation_project_user tr where tr.user_id= #{userId} )");
}
}
WHERE("task.id in (" + taskIdsStr.toString() + ")");
}
}.toString();
}
}

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java

@ -276,7 +276,7 @@ public class UserMapperProvider {
{
SELECT("u.*");
FROM(TABLE_NAME + " u ,t_escheduler_access_token t");
WHERE(" u.id = t.user_id and token=#{token}");
WHERE(" u.id = t.user_id and token=#{token} and t.expire_time > NOW()");
}
}.toString();
}

8
escheduler-dao/src/main/java/cn/escheduler/dao/model/ExecuteStatusCount.java

@ -50,4 +50,12 @@ public class ExecuteStatusCount {
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "ExecuteStatusCount{" +
"state=" + state +
", count=" + count +
'}';
}
}

Loading…
Cancel
Save