From d62091f9deb2f8a31745fd50a6add66d1eccec54 Mon Sep 17 00:00:00 2001 From: baoliang Date: Mon, 15 Apr 2019 18:15:10 +0800 Subject: [PATCH 1/3] add history task record --- .../cn/escheduler/api/service/TaskRecordService.java | 7 +++++-- .../src/main/java/cn/escheduler/common/Constants.java | 9 +++++++++ .../src/main/java/cn/escheduler/dao/TaskRecordDao.java | 10 ++++++---- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java index b5a93a390e..7c6ea38834 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java @@ -29,6 +29,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static cn.escheduler.common.Constants.*; + /** * task record service */ @@ -69,8 +71,9 @@ public class TaskRecordService extends BaseService{ map.put("offset", pageInfo.getStart().toString()); map.put("pageSize", pageInfo.getPageSize().toString()); - int count = TaskRecordDao.countTaskRecord(map); - List recordList = TaskRecordDao.queryAllTaskRecord(map); + String table =TASK_RECORD_TABLE_HIVE_LOG; + int count = TaskRecordDao.countTaskRecord(map, table); + List recordList = TaskRecordDao.queryAllTaskRecord(map, table); pageInfo.setTotalCount(count); pageInfo.setLists(recordList); result.put(Constants.DATA_LIST, pageInfo); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index 98a2535f03..4e9a11c841 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -463,6 +463,10 @@ public final class Constants { public static final String TASK_RECORD_PWD = "task.record.datasource.password"; + public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; + + public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; + public static final String STATUS = "status"; @@ -826,4 +830,9 @@ public final class Constants { public static final String CONTENT = "content"; public static final String DEPENDENT_SPLIT = ":||"; public static final String DEPENDENT_ALL = "ALL"; + + + /** + * + */ } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java index c2c82b77f8..555f3a6d18 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java @@ -40,6 +40,8 @@ public class TaskRecordDao { private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName()); + + /** * 加载配置文件 */ @@ -134,7 +136,7 @@ public class TaskRecordDao { * @param filterMap * @return */ - public static int countTaskRecord(Map filterMap){ + public static int countTaskRecord(Map filterMap, String table){ int count = 0; Connection conn = null; @@ -143,7 +145,7 @@ public class TaskRecordDao { if(conn == null){ return count; } - String sql = "select count(1) as count from eamp_hive_log_hd"; + String sql = String.format("select count(1) as count from %s", table); sql += getWhereString(filterMap); PreparedStatement pstmt; pstmt = conn.prepareStatement(sql); @@ -171,9 +173,9 @@ public class TaskRecordDao { * @param filterMap * @return */ - public static List queryAllTaskRecord(Map filterMap ) { + public static List queryAllTaskRecord(Map filterMap , String table) { - String sql = "select * from eamp_hive_log_hd "; + String sql = String.format("select * from %s", table); sql += getWhereString(filterMap); int offset = Integer.parseInt(filterMap.get("offset")); From 7837a3fe1932aaeceb7aa756a167bcd0c6e8b42b Mon Sep 17 00:00:00 2001 From: baoliang Date: Mon, 15 Apr 2019 19:45:49 +0800 Subject: [PATCH 2/3] add history task record --- .../api/controller/TaskRecordController.java | 34 ++++++++++++++++++- .../api/service/TaskRecordService.java | 4 +-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/TaskRecordController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/TaskRecordController.java index 7c81341b95..72d7c8dbfd 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/TaskRecordController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/TaskRecordController.java @@ -68,7 +68,7 @@ public class TaskRecordController extends BaseController{ try{ logger.info("query task record list, task name:{}, state :{}, taskDate: {}, start:{}, end:{}", taskName, state, taskDate, startTime, endTime); - Map result = taskRecordService.queryTaskRecordListPaging(taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); + Map result = taskRecordService.queryTaskRecordListPaging(false, taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); return returnDataListPaging(result); }catch (Exception e){ logger.error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg(),e); @@ -77,4 +77,36 @@ public class TaskRecordController extends BaseController{ } + /** + * query history task record list paging + * + * @param loginUser + * @return + */ + @GetMapping("/history-list-paging") + @ResponseStatus(HttpStatus.OK) + public Result queryHistoryTaskRecordListPaging(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "taskName", required = false) String taskName, + @RequestParam(value = "state", required = false) String state, + @RequestParam(value = "sourceTable", required = false) String sourceTable, + @RequestParam(value = "destTable", required = false) String destTable, + @RequestParam(value = "taskDate", required = false) String taskDate, + @RequestParam(value = "startDate", required = false) String startTime, + @RequestParam(value = "endDate", required = false) String endTime, + @RequestParam("pageNo") Integer pageNo, + @RequestParam("pageSize") Integer pageSize + ){ + + try{ + logger.info("query hisotry task record list, task name:{}, state :{}, taskDate: {}, start:{}, end:{}", + taskName, state, taskDate, startTime, endTime); + Map result = taskRecordService.queryTaskRecordListPaging(true, taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); + return returnDataListPaging(result); + }catch (Exception e){ + logger.error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg(),e); + return error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getCode(), QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg()); + } + + } + } diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java index 7c6ea38834..52306acd6c 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java @@ -53,7 +53,7 @@ public class TaskRecordService extends BaseService{ * @param pageSize * @return */ - public Map queryTaskRecordListPaging(String taskName, String startDate, + public Map queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate, String taskDate, String sourceTable, String destTable, String endDate, String state, Integer pageNo, Integer pageSize) { @@ -71,7 +71,7 @@ public class TaskRecordService extends BaseService{ map.put("offset", pageInfo.getStart().toString()); map.put("pageSize", pageInfo.getPageSize().toString()); - String table =TASK_RECORD_TABLE_HIVE_LOG; + String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG : TASK_RECORD_TABLE_HIVE_LOG; int count = TaskRecordDao.countTaskRecord(map, table); List recordList = TaskRecordDao.queryAllTaskRecord(map, table); pageInfo.setTotalCount(count); From f555faffed9311ca7398f8039c4f0d26c6efa229 Mon Sep 17 00:00:00 2001 From: baoliang Date: Tue, 16 Apr 2019 16:51:25 +0800 Subject: [PATCH 3/3] fix bug:command exception. --- .../java/cn/escheduler/dao/ProcessDao.java | 74 +++++++++++-------- 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 38dce40f81..cf217c95aa 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -120,48 +120,64 @@ public class ProcessDao extends AbstractBaseDao { * find one command from command queue, construct process instance * @param logger * @param host - * @param vaildThreadNum + * @param validThreadNum * @return */ @Transactional(value = "TransactionManager",rollbackFor = Exception.class) - public ProcessInstance scanCommand(Logger logger, String host, int vaildThreadNum){ + public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){ ProcessInstance processInstance = null; Command command = findOneCommand(); - if (command == null) { return null; } logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); - processInstance = constructProcessInstance(command, host); - - //cannot construct process instance, return null; - if(processInstance == null){ - logger.error("scan command, command parameter is error: %s", command.toString()); - }else{ - // check thread number enough for this command, if not, change state to waiting thread. - int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); - if(vaildThreadNum < commandThreadCount){ - logger.info("there is not enough thread for this command: {}",command.toString() ); - processInstance.setState(ExecutionStatus.WAITTING_THREAD); - if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){ - processInstance.addHistoryCmd(command.getCommandType()); - } - saveProcessInstance(processInstance); - this.setSubProcessParam(processInstance); - createRecoveryWaitingThreadCommand(command, processInstance); + try{ + processInstance = constructProcessInstance(command, host); + //cannot construct process instance, return null; + if(processInstance == null){ + logger.error("scan command, command parameter is error: %s", command.toString()); + delCommandByid(command.getId()); return null; + }else if(!checkThreadNum(command, validThreadNum)){ + logger.info("there is not enough thread for this command: {}",command.toString() ); + return setWaitingThreadProcess(command, processInstance); }else{ - processInstance.setCommandType(command.getCommandType()); - processInstance.addHistoryCmd(command.getCommandType()); - saveProcessInstance(processInstance); - this.setSubProcessParam(processInstance); + processInstance.setCommandType(command.getCommandType()); + processInstance.addHistoryCmd(command.getCommandType()); + saveProcessInstance(processInstance); + this.setSubProcessParam(processInstance); + delCommandByid(command.getId()); + return processInstance; } + }catch (Exception e){ + logger.error("scan command error ", e); + delCommandByid(command.getId()); } - // delete command - delCommandByid(command.getId()); - return processInstance; + return null; + } + + /** + * set process waiting thread + * @param command + * @param processInstance + * @return + */ + private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) { + processInstance.setState(ExecutionStatus.WAITTING_THREAD); + if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){ + processInstance.addHistoryCmd(command.getCommandType()); + } + saveProcessInstance(processInstance); + this.setSubProcessParam(processInstance); + createRecoveryWaitingThreadCommand(command, processInstance); + return null; + } + + private boolean checkThreadNum(Command command, int validThreadNum) { + int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); + return validThreadNum >= commandThreadCount; } /** @@ -669,7 +685,7 @@ public class ProcessDao extends AbstractBaseDao { paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId())); processInstance.setCommandParam(JSONUtils.toJson(paramMap)); processInstance.setIsSubProcess(Flag.YES); - this.updateProcessInstance(processInstance); + this.saveProcessInstance(processInstance); } // copy parent instance user def params to sub process.. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID); @@ -677,7 +693,7 @@ public class ProcessDao extends AbstractBaseDao { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if(parentInstance != null){ processInstance.setGlobalParams(parentInstance.getGlobalParams()); - this.updateProcessInstance(processInstance); + this.saveProcessInstance(processInstance); }else{ logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); }