Browse Source

sleep when resource in not satisfy. fix #1522 (#1523)

* fix #1515

* sleep when resource in not satisfy. fix #1522

* add sleep 1s for no command
pull/2/head
Tboy 5 years ago committed by dailidong
parent
commit
076ccbdb9b
  1. 60
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java

60
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java

@ -98,39 +98,41 @@ public class MasterSchedulerThread implements Runnable {
InterProcessMutex mutex = null;
try {
if(OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory())){
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
String znodeLock = zkMasterClient.getMasterLockPath();
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
mutex.acquire();
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
int activeCount = poolExecutor.getActiveCount();
// make sure to scan and delete command table in one transaction
Command command = processDao.findOneCommand();
if (command != null) {
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
try{
processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
masterExecService.execute(new MasterExecThread(processInstance,processDao));
}
}catch (Exception e){
logger.error("scan command error ", e);
processDao.moveToErrorCommand(command, e.toString());
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
if(!runCheckFlag) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
String znodeLock = zkMasterClient.getMasterLockPath();
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
mutex.acquire();
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
int activeCount = poolExecutor.getActiveCount();
// make sure to scan and delete command table in one transaction
Command command = processDao.findOneCommand();
if (command != null) {
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
try{
processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
masterExecService.execute(new MasterExecThread(processInstance,processDao));
}
}catch (Exception e){
logger.error("scan command error ", e);
processDao.moveToErrorCommand(command, e.toString());
}
} else{
//indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
// accessing the command table every SLEEP_TIME_MILLIS milliseconds
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}catch (Exception e){
logger.error("master scheduler thread exception : " + e.getMessage(),e);
}finally{

Loading…
Cancel
Save