From 076ccbdb9b3657f286198caf23f500341269dff5 Mon Sep 17 00:00:00 2001 From: Tboy Date: Fri, 20 Dec 2019 15:43:34 +0800 Subject: [PATCH] sleep when resource in not satisfy. fix #1522 (#1523) * fix #1515 * sleep when resource in not satisfy. fix #1522 * add sleep 1s for no command --- .../master/runner/MasterSchedulerThread.java | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java index 69c2304cd7..8d7d5a0add 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java @@ -98,39 +98,41 @@ public class MasterSchedulerThread implements Runnable { InterProcessMutex mutex = null; try { - if(OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory())){ - if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { - - // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters - String znodeLock = zkMasterClient.getMasterLockPath(); - - mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); - mutex.acquire(); - - ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService; - int activeCount = poolExecutor.getActiveCount(); - // make sure to scan and delete command table in one transaction - Command command = processDao.findOneCommand(); - if (command != null) { - logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); - - try{ - processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); - if (processInstance != null) { - logger.info("start master exec thread , split DAG ..."); - masterExecService.execute(new MasterExecThread(processInstance,processDao)); - } - }catch (Exception e){ - logger.error("scan command error ", e); - processDao.moveToErrorCommand(command, e.toString()); + boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); + if(!runCheckFlag) { + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + continue; + } + if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { + + // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters + String znodeLock = zkMasterClient.getMasterLockPath(); + + mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); + mutex.acquire(); + + ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService; + int activeCount = poolExecutor.getActiveCount(); + // make sure to scan and delete command table in one transaction + Command command = processDao.findOneCommand(); + if (command != null) { + logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); + + try{ + processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); + if (processInstance != null) { + logger.info("start master exec thread , split DAG ..."); + masterExecService.execute(new MasterExecThread(processInstance,processDao)); } + }catch (Exception e){ + logger.error("scan command error ", e); + processDao.moveToErrorCommand(command, e.toString()); } + } else{ + //indicate that no command ,sleep for 1s + Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } - - // accessing the command table every SLEEP_TIME_MILLIS milliseconds - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - }catch (Exception e){ logger.error("master scheduler thread exception : " + e.getMessage(),e); }finally{