From 09dec3b7a8eb86896f0882b79d4a86033220ffd2 Mon Sep 17 00:00:00 2001 From: Baoqi Wu Date: Sun, 17 Nov 2019 10:10:45 +0800 Subject: [PATCH] fix #1245, make scanCommand transactional (#1246) --- .../dolphinscheduler/dao/ProcessDao.java | 56 +++++++------------ .../master/runner/MasterSchedulerThread.java | 19 +++++-- 2 files changed, 36 insertions(+), 39 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index a611c52f55..d42a94c848 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -109,58 +109,44 @@ public class ProcessDao { /** - * find one command from command queue, construct process instance + * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @param logger logger * @param host host * @param validThreadNum validThreadNum + * @param command found command * @return process instance */ @Transactional(rollbackFor = Exception.class) - public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){ - - ProcessInstance processInstance = null; - Command command = findOneCommand(); - if (command == null) { + public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) { + ProcessInstance processInstance = constructProcessInstance(command, host); + //cannot construct process instance, return null; + if(processInstance == null){ + logger.error("scan command, command parameter is error: %s", command.toString()); + moveToErrorCommand(command, "process instance is null"); return null; } - logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); - - try{ - processInstance = constructProcessInstance(command, host); - //cannot construct process instance, return null; - if(processInstance == null){ - logger.error("scan command, command parameter is error: %s", command.toString()); - delCommandByid(command.getId()); - saveErrorCommand(command, "process instance is null"); - return null; - } - if(!checkThreadNum(command, validThreadNum)){ - logger.info("there is not enough thread for this command: {}",command.toString() ); - return setWaitingThreadProcess(command, processInstance); - } - processInstance.setCommandType(command.getCommandType()); - processInstance.addHistoryCmd(command.getCommandType()); - saveProcessInstance(processInstance); - this.setSubProcessParam(processInstance); - delCommandByid(command.getId()); - return processInstance; - }catch (Exception e){ - logger.error("scan command error ", e); - saveErrorCommand(command, e.toString()); - delCommandByid(command.getId()); + if(!checkThreadNum(command, validThreadNum)){ + logger.info("there is not enough thread for this command: {}",command.toString() ); + return setWaitingThreadProcess(command, processInstance); } - return null; + processInstance.setCommandType(command.getCommandType()); + processInstance.addHistoryCmd(command.getCommandType()); + saveProcessInstance(processInstance); + this.setSubProcessParam(processInstance); + delCommandByid(command.getId()); + return processInstance; } /** - * save error command + * save error command, and delete original command * @param command command * @param message message */ - private void saveErrorCommand(Command command, String message) { - + @Transactional(rollbackFor = Exception.class) + public void moveToErrorCommand(Command command, String message) { ErrorCommand errorCommand = new ErrorCommand(command, message); this.errorCommandMapper.insert(errorCommand); + delCommandByid(command.getId()); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java index dc1c2fb75f..48ea415282 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.commons.configuration.Configuration; @@ -108,10 +109,20 @@ public class MasterSchedulerThread implements Runnable { ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService; int activeCount = poolExecutor.getActiveCount(); // make sure to scan and delete command table in one transaction - processInstance = processDao.scanCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount); - if (processInstance != null) { - logger.info("start master exex thread , split DAG ..."); - masterExecService.execute(new MasterExecThread(processInstance,processDao)); + Command command = processDao.findOneCommand(); + if (command != null) { + logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); + + try{ + processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); + if (processInstance != null) { + logger.info("start master exec thread , split DAG ..."); + masterExecService.execute(new MasterExecThread(processInstance,processDao)); + } + }catch (Exception e){ + logger.error("scan command error ", e); + processDao.moveToErrorCommand(command, e.toString()); + } } } }