|
|
|
@ -28,49 +28,43 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
|
|
|
|
|
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
|
|
|
|
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; |
|
|
|
|
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
|
|
|
|
import org.apache.dolphinscheduler.service.process.ProcessService; |
|
|
|
|
import org.apache.dolphinscheduler.service.zk.AbstractZKClient; |
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
|
|
|
|
|
|
import java.util.concurrent.ExecutorService; |
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
|
import java.util.concurrent.ThreadPoolExecutor; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* master scheduler thread |
|
|
|
|
*/ |
|
|
|
|
public class MasterSchedulerThread implements Runnable { |
|
|
|
|
@Service |
|
|
|
|
public class MasterSchedulerService extends Thread { |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* logger of MasterSchedulerThread |
|
|
|
|
*/ |
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerThread.class); |
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* master exec service |
|
|
|
|
*/ |
|
|
|
|
private final ExecutorService masterExecService; |
|
|
|
|
private ThreadPoolExecutor masterExecService; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* dolphinscheduler database interface
|
|
|
|
|
*/ |
|
|
|
|
private final ProcessService processService; |
|
|
|
|
@Autowired |
|
|
|
|
private ProcessService processService; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* zookeeper master client |
|
|
|
|
*/ |
|
|
|
|
private final ZKMasterClient zkMasterClient ; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* master exec thread num |
|
|
|
|
*/ |
|
|
|
|
private int masterExecThreadNum; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* master config |
|
|
|
|
*/ |
|
|
|
|
private MasterConfig masterConfig; |
|
|
|
|
@Autowired |
|
|
|
|
private ZKMasterClient zkMasterClient; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* netty remoting client |
|
|
|
@ -78,21 +72,25 @@ public class MasterSchedulerThread implements Runnable {
|
|
|
|
|
private NettyRemotingClient nettyRemotingClient; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
|
private MasterConfig masterConfig; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* constructor of MasterSchedulerThread |
|
|
|
|
* @param zkClient zookeeper master client |
|
|
|
|
* @param processService process service |
|
|
|
|
* @param masterExecThreadNum master exec thread num |
|
|
|
|
*/ |
|
|
|
|
public MasterSchedulerThread(ZKMasterClient zkClient, ProcessService processService, int masterExecThreadNum){ |
|
|
|
|
this.processService = processService; |
|
|
|
|
this.zkMasterClient = zkClient; |
|
|
|
|
this.masterExecThreadNum = masterExecThreadNum; |
|
|
|
|
this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum); |
|
|
|
|
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); |
|
|
|
|
//
|
|
|
|
|
@PostConstruct |
|
|
|
|
public void init(){ |
|
|
|
|
this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); |
|
|
|
|
NettyClientConfig clientConfig = new NettyClientConfig(); |
|
|
|
|
this.nettyRemotingClient = new NettyRemotingClient(clientConfig); |
|
|
|
|
super.setName("MasterSchedulerThread"); |
|
|
|
|
super.start(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@PreDestroy |
|
|
|
|
public void close(){ |
|
|
|
|
nettyRemotingClient.close(); |
|
|
|
|
logger.info("master schedule service stopped..."); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -100,15 +98,10 @@ public class MasterSchedulerThread implements Runnable {
|
|
|
|
|
*/ |
|
|
|
|
@Override |
|
|
|
|
public void run() { |
|
|
|
|
logger.info("master scheduler start successfully..."); |
|
|
|
|
logger.info("master scheduler started"); |
|
|
|
|
while (Stopper.isRunning()){ |
|
|
|
|
|
|
|
|
|
// process instance
|
|
|
|
|
ProcessInstance processInstance = null; |
|
|
|
|
|
|
|
|
|
InterProcessMutex mutex = null; |
|
|
|
|
try { |
|
|
|
|
|
|
|
|
|
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); |
|
|
|
|
if(!runCheckFlag) { |
|
|
|
|
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
|
|
|
@ -116,21 +109,16 @@ public class MasterSchedulerThread implements Runnable {
|
|
|
|
|
} |
|
|
|
|
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { |
|
|
|
|
|
|
|
|
|
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
|
|
|
|
|
String znodeLock = zkMasterClient.getMasterLockPath(); |
|
|
|
|
mutex = zkMasterClient.blockAcquireMutex(); |
|
|
|
|
|
|
|
|
|
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); |
|
|
|
|
mutex.acquire(); |
|
|
|
|
|
|
|
|
|
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService; |
|
|
|
|
int activeCount = poolExecutor.getActiveCount(); |
|
|
|
|
int activeCount = masterExecService.getActiveCount(); |
|
|
|
|
// make sure to scan and delete command table in one transaction
|
|
|
|
|
Command command = processService.findOneCommand(); |
|
|
|
|
if (command != null) { |
|
|
|
|
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); |
|
|
|
|
|
|
|
|
|
try{ |
|
|
|
|
processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); |
|
|
|
|
ProcessInstance processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterConfig.getMasterExecThreads() - activeCount, command); |
|
|
|
|
if (processInstance != null) { |
|
|
|
|
logger.info("start master exec thread , split DAG ..."); |
|
|
|
|
masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient)); |
|
|
|
@ -144,15 +132,11 @@ public class MasterSchedulerThread implements Runnable {
|
|
|
|
|
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}catch (Exception e){ |
|
|
|
|
} catch (Exception e){ |
|
|
|
|
logger.error("master scheduler thread exception",e); |
|
|
|
|
}finally{ |
|
|
|
|
AbstractZKClient.releaseMutex(mutex); |
|
|
|
|
} finally{ |
|
|
|
|
zkMasterClient.releaseMutex(mutex); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
nettyRemotingClient.close(); |
|
|
|
|
logger.info("master server stopped..."); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |