Browse Source

sleep when resource in not satisfy. fix #1522

pull/2/head
dk.technoboy 5 years ago
parent
commit
836bf483f8
  1. 57
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java

57
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java

@ -98,39 +98,38 @@ public class MasterSchedulerThread implements Runnable {
InterProcessMutex mutex = null;
try {
if(OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory())){
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
String znodeLock = zkMasterClient.getMasterLockPath();
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
mutex.acquire();
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
int activeCount = poolExecutor.getActiveCount();
// make sure to scan and delete command table in one transaction
Command command = processDao.findOneCommand();
if (command != null) {
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
try{
processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
masterExecService.execute(new MasterExecThread(processInstance,processDao));
}
}catch (Exception e){
logger.error("scan command error ", e);
processDao.moveToErrorCommand(command, e.toString());
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
if(!runCheckFlag) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
String znodeLock = zkMasterClient.getMasterLockPath();
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
mutex.acquire();
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
int activeCount = poolExecutor.getActiveCount();
// make sure to scan and delete command table in one transaction
Command command = processDao.findOneCommand();
if (command != null) {
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
try{
processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
masterExecService.execute(new MasterExecThread(processInstance,processDao));
}
}catch (Exception e){
logger.error("scan command error ", e);
processDao.moveToErrorCommand(command, e.toString());
}
}
}
// accessing the command table every SLEEP_TIME_MILLIS milliseconds
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}catch (Exception e){
logger.error("master scheduler thread exception : " + e.getMessage(),e);
}finally{

Loading…
Cancel
Save