diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 38dce40f81..cf217c95aa 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -120,48 +120,64 @@ public class ProcessDao extends AbstractBaseDao { * find one command from command queue, construct process instance * @param logger * @param host - * @param vaildThreadNum + * @param validThreadNum * @return */ @Transactional(value = "TransactionManager",rollbackFor = Exception.class) - public ProcessInstance scanCommand(Logger logger, String host, int vaildThreadNum){ + public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){ ProcessInstance processInstance = null; Command command = findOneCommand(); - if (command == null) { return null; } logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); - processInstance = constructProcessInstance(command, host); - - //cannot construct process instance, return null; - if(processInstance == null){ - logger.error("scan command, command parameter is error: %s", command.toString()); - }else{ - // check thread number enough for this command, if not, change state to waiting thread. - int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); - if(vaildThreadNum < commandThreadCount){ - logger.info("there is not enough thread for this command: {}",command.toString() ); - processInstance.setState(ExecutionStatus.WAITTING_THREAD); - if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){ - processInstance.addHistoryCmd(command.getCommandType()); - } - saveProcessInstance(processInstance); - this.setSubProcessParam(processInstance); - createRecoveryWaitingThreadCommand(command, processInstance); + try{ + processInstance = constructProcessInstance(command, host); + //cannot construct process instance, return null; + if(processInstance == null){ + logger.error("scan command, command parameter is error: %s", command.toString()); + delCommandByid(command.getId()); return null; + }else if(!checkThreadNum(command, validThreadNum)){ + logger.info("there is not enough thread for this command: {}",command.toString() ); + return setWaitingThreadProcess(command, processInstance); }else{ - processInstance.setCommandType(command.getCommandType()); - processInstance.addHistoryCmd(command.getCommandType()); - saveProcessInstance(processInstance); - this.setSubProcessParam(processInstance); + processInstance.setCommandType(command.getCommandType()); + processInstance.addHistoryCmd(command.getCommandType()); + saveProcessInstance(processInstance); + this.setSubProcessParam(processInstance); + delCommandByid(command.getId()); + return processInstance; } + }catch (Exception e){ + logger.error("scan command error ", e); + delCommandByid(command.getId()); } - // delete command - delCommandByid(command.getId()); - return processInstance; + return null; + } + + /** + * set process waiting thread + * @param command + * @param processInstance + * @return + */ + private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) { + processInstance.setState(ExecutionStatus.WAITTING_THREAD); + if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){ + processInstance.addHistoryCmd(command.getCommandType()); + } + saveProcessInstance(processInstance); + this.setSubProcessParam(processInstance); + createRecoveryWaitingThreadCommand(command, processInstance); + return null; + } + + private boolean checkThreadNum(Command command, int validThreadNum) { + int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); + return validThreadNum >= commandThreadCount; } /** @@ -669,7 +685,7 @@ public class ProcessDao extends AbstractBaseDao { paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId())); processInstance.setCommandParam(JSONUtils.toJson(paramMap)); processInstance.setIsSubProcess(Flag.YES); - this.updateProcessInstance(processInstance); + this.saveProcessInstance(processInstance); } // copy parent instance user def params to sub process.. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID); @@ -677,7 +693,7 @@ public class ProcessDao extends AbstractBaseDao { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if(parentInstance != null){ processInstance.setGlobalParams(parentInstance.getGlobalParams()); - this.updateProcessInstance(processInstance); + this.saveProcessInstance(processInstance); }else{ logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); }