Browse Source

Refactor worker (#2115)

* refactor worker registry

* refactor master server
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
0febd9530c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 85
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  2. 82
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  3. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
  4. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  5. 88
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  6. 23
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
  7. 69
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

85
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -18,9 +18,6 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@ -29,11 +26,8 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.quartz.SchedulerException; import org.quartz.SchedulerException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -44,7 +38,6 @@ import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
/** /**
* master server * master server
@ -57,23 +50,6 @@ public class MasterServer {
*/ */
private static final Logger logger = LoggerFactory.getLogger(MasterServer.class); private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
/**
* zk master client
*/
@Autowired
private ZKMasterClient zkMasterClient = null;
/**
* process service
*/
@Autowired
protected ProcessService processService;
/**
* master exec thread pool
*/
private ExecutorService masterSchedulerService;
/** /**
* master config * master config
*/ */
@ -98,6 +74,12 @@ public class MasterServer {
@Autowired @Autowired
private MasterRegistry masterRegistry; private MasterRegistry masterRegistry;
/**
* zk master client
*/
@Autowired
private ZKMasterClient zkMasterClient;
/** /**
* master server startup * master server startup
* *
@ -125,27 +107,13 @@ public class MasterServer {
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
// //
this.zkMasterClient.start();
this.masterRegistry.registry(); this.masterRegistry.registry();
//
zkMasterClient.init();
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
// master scheduler thread
MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread(
zkMasterClient,
processService,
masterConfig.getMasterExecThreads());
// submit master scheduler thread
masterSchedulerService.execute(masterSchedulerThread);
// start QuartzExecutors // start QuartzExecutors
// what system should do if exception // what system should do if exception
try { try {
logger.info("start Quartz server..."); logger.info("start Quartz server...");
ProcessScheduleJob.init(processService);
QuartzExecutors.getInstance().start(); QuartzExecutors.getInstance().start();
} catch (Exception e) { } catch (Exception e) {
try { try {
@ -162,19 +130,15 @@ public class MasterServer {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
if (zkMasterClient.getActiveMasterNum() <= 1) { close("shutdownHook");
zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server");
}
close("shutdownhook");
} }
})); }));
} }
/** /**
* gracefully stop * gracefully close
* @param cause why stopping * @param cause
*/ */
public void close(String cause) { public void close(String cause) {
@ -197,40 +161,15 @@ public class MasterServer {
} }
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.masterRegistry.unRegistry(); this.masterRegistry.unRegistry();
this.zkMasterClient.close();
//close quartz //close quartz
try{ try{
QuartzExecutors.getInstance().shutdown(); QuartzExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("Quartz service stopped exception:{}",e.getMessage());
}
logger.info("Quartz service stopped"); logger.info("Quartz service stopped");
try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){ }catch (Exception e){
logger.warn("threadPool service stopped exception:{}",e.getMessage()); logger.warn("Quartz service stopped exception:{}",e.getMessage());
}
logger.info("threadPool service stopped");
try {
masterSchedulerService.shutdownNow();
}catch (Exception e){
logger.warn("master scheduler service stopped exception:{}",e.getMessage());
}
logger.info("master scheduler service stopped");
try {
zkMasterClient.close();
}catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage());
} }
logger.info("zookeeper service stopped");
} catch (Exception e) { } catch (Exception e) {
logger.error("master server stop exception ", e); logger.error("master server stop exception ", e);
System.exit(-1); System.exit(-1);

82
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -28,49 +28,43 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutorService; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
/** /**
* master scheduler thread * master scheduler thread
*/ */
public class MasterSchedulerThread implements Runnable { @Service
public class MasterSchedulerService extends Thread {
/** /**
* logger of MasterSchedulerThread * logger of MasterSchedulerThread
*/ */
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerThread.class); private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
/** /**
* master exec service * master exec service
*/ */
private final ExecutorService masterExecService; private ThreadPoolExecutor masterExecService;
/** /**
* dolphinscheduler database interface * dolphinscheduler database interface
*/ */
private final ProcessService processService; @Autowired
private ProcessService processService;
/** /**
* zookeeper master client * zookeeper master client
*/ */
private final ZKMasterClient zkMasterClient ; @Autowired
private ZKMasterClient zkMasterClient;
/**
* master exec thread num
*/
private int masterExecThreadNum;
/**
* master config
*/
private MasterConfig masterConfig;
/** /**
* netty remoting client * netty remoting client
@ -78,21 +72,25 @@ public class MasterSchedulerThread implements Runnable {
private NettyRemotingClient nettyRemotingClient; private NettyRemotingClient nettyRemotingClient;
@Autowired
private MasterConfig masterConfig;
/** /**
* constructor of MasterSchedulerThread * constructor of MasterSchedulerThread
* @param zkClient zookeeper master client
* @param processService process service
* @param masterExecThreadNum master exec thread num
*/ */
public MasterSchedulerThread(ZKMasterClient zkClient, ProcessService processService, int masterExecThreadNum){ @PostConstruct
this.processService = processService; public void init(){
this.zkMasterClient = zkClient; this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
this.masterExecThreadNum = masterExecThreadNum;
this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum);
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
//
NettyClientConfig clientConfig = new NettyClientConfig(); NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig); this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
super.setName("MasterSchedulerThread");
super.start();
}
@PreDestroy
public void close(){
nettyRemotingClient.close();
logger.info("master schedule service stopped...");
} }
/** /**
@ -100,15 +98,10 @@ public class MasterSchedulerThread implements Runnable {
*/ */
@Override @Override
public void run() { public void run() {
logger.info("master scheduler start successfully..."); logger.info("master scheduler started");
while (Stopper.isRunning()){ while (Stopper.isRunning()){
// process instance
ProcessInstance processInstance = null;
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
if(!runCheckFlag) { if(!runCheckFlag) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
@ -116,21 +109,16 @@ public class MasterSchedulerThread implements Runnable {
} }
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters mutex = zkMasterClient.blockAcquireMutex();
String znodeLock = zkMasterClient.getMasterLockPath();
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); int activeCount = masterExecService.getActiveCount();
mutex.acquire();
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
int activeCount = poolExecutor.getActiveCount();
// make sure to scan and delete command table in one transaction // make sure to scan and delete command table in one transaction
Command command = processService.findOneCommand(); Command command = processService.findOneCommand();
if (command != null) { if (command != null) {
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
try{ try{
processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); ProcessInstance processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterConfig.getMasterExecThreads() - activeCount, command);
if (processInstance != null) { if (processInstance != null) {
logger.info("start master exec thread , split DAG ..."); logger.info("start master exec thread , split DAG ...");
masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient)); masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));
@ -144,15 +132,11 @@ public class MasterSchedulerThread implements Runnable {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} }
} }
}catch (Exception e){ } catch (Exception e){
logger.error("master scheduler thread exception",e); logger.error("master scheduler thread exception",e);
}finally{ } finally{
AbstractZKClient.releaseMutex(mutex); zkMasterClient.releaseMutex(mutex);
} }
} }
nettyRemotingClient.close();
logger.info("master server stopped...");
} }
} }

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java

@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.service.zk.AbstractListener; import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -73,6 +74,12 @@ public class ZookeeperNodeManager implements InitializingBean {
@Autowired @Autowired
private ZookeeperRegistryCenter registryCenter; private ZookeeperRegistryCenter registryCenter;
/**
* alert dao
*/
@Autowired
private AlertDao alertDao;
/** /**
* init listener * init listener
* @throws Exception * @throws Exception
@ -136,6 +143,7 @@ public class ZookeeperNodeManager implements InitializingBean {
Set<String> previousNodes = new HashSet<>(workerNodes); Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerGroupNodesDirectly(group); Set<String> currentNodes = registryCenter.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes); syncWorkerGroupNodes(group, currentNodes);
alertDao.sendServerStopedAlert(1, path, "WORKER");
} }
} catch (IllegalArgumentException ignore) { } catch (IllegalArgumentException ignore) {
logger.warn(ignore.getMessage()); logger.warn(ignore.getMessage());
@ -175,6 +183,7 @@ public class ZookeeperNodeManager implements InitializingBean {
Set<String> previousNodes = new HashSet<>(masterNodes); Set<String> previousNodes = new HashSet<>(masterNodes);
Set<String> currentNodes = registryCenter.getMasterNodesDirectly(); Set<String> currentNodes = registryCenter.getMasterNodesDirectly();
syncMasterNodes(currentNodes); syncMasterNodes(currentNodes);
alertDao.sendServerStopedAlert(1, path, "MASTER");
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("MasterNodeListener capture data change and get data failed.", ex); logger.error("MasterNodeListener capture data change and get data failed.", ex);

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@ -78,7 +77,6 @@ public class WorkerServer {
* @param args arguments * @param args arguments
*/ */
public static void main(String[] args) { public static void main(String[] args) {
System.setProperty("spring.profiles.active","worker");
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER); Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
} }
@ -136,11 +134,6 @@ public class WorkerServer {
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.workerRegistry.unRegistry(); this.workerRegistry.unRegistry();
try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("threadPool service stopped exception:{}",e.getMessage());
}
} catch (Exception e) { } catch (Exception e) {
logger.error("worker server stop exception ", e); logger.error("worker server stop exception ", e);
System.exit(-1); System.exit(-1);

88
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -16,22 +16,19 @@
*/ */
package org.apache.dolphinscheduler.server.zk; package org.apache.dolphinscheduler.server.zk;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient; import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -41,7 +38,6 @@ import org.springframework.stereotype.Component;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadFactory;
/** /**
@ -57,46 +53,19 @@ public class ZKMasterClient extends AbstractZKClient {
*/ */
private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);
/**
* thread factory
*/
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newGenericThreadFactory("Master-Main-Thread");
/**
* master znode
*/
private String masterZNode = null;
/**
* alert database access
*/
private AlertDao alertDao = null;
/** /**
* process service * process service
*/ */
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
/** public void start() {
* default constructor
*/
private ZKMasterClient(){}
/**
* init
*/
public void init(){
logger.info("initialize master client...");
// init dao
this.initDao();
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
String znodeLock = getMasterStartUpLockPath(); String znodeLock = getMasterStartUpLockPath();
mutex = new InterProcessMutex(zkClient, znodeLock); mutex = new InterProcessMutex(getZkClient(), znodeLock);
mutex.acquire(); mutex.acquire();
// init system znode // init system znode
@ -115,20 +84,9 @@ public class ZKMasterClient extends AbstractZKClient {
} }
} }
@Override
/** public void close(){
* init dao super.close();
*/
public void initDao(){
this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
}
/**
* get alert dao
*
* @return AlertDao
*/
public AlertDao getAlertDao() {
return alertDao;
} }
/** /**
@ -167,8 +125,6 @@ public class ZKMasterClient extends AbstractZKClient {
String serverHost = getHostByEventDataPath(path); String serverHost = getHostByEventDataPath(path);
// handle dead server // handle dead server
handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
//alert server down.
alertServerDown(serverHost, zkNodeType);
//failover server //failover server
if(failover){ if(failover){
failoverServerWhenDown(serverHost, zkNodeType); failoverServerWhenDown(serverHost, zkNodeType);
@ -222,18 +178,6 @@ public class ZKMasterClient extends AbstractZKClient {
} }
} }
/**
* send alert when server down
*
* @param serverHost server host
* @param zkNodeType zookeeper node type
*/
private void alertServerDown(String serverHost, ZKNodeType zkNodeType) {
String serverType = zkNodeType.toString();
alertDao.sendServerStopedAlert(1, serverHost, serverType);
}
/** /**
* monitor master * monitor master
* @param event event * @param event event
@ -271,16 +215,6 @@ public class ZKMasterClient extends AbstractZKClient {
} }
} }
/**
* get master znode
*
* @return master zookeeper node
*/
public String getMasterZNode() {
return masterZNode;
}
/** /**
* task needs failover if task start before worker starts * task needs failover if task start before worker starts
* *
@ -399,4 +333,10 @@ public class ZKMasterClient extends AbstractZKClient {
logger.info("master failover end"); logger.info("master failover end");
} }
public InterProcessMutex blockAcquireMutex() throws Exception {
InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath());
mutex.acquire();
return mutex;
}
} }

23
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.quartz.Job; import org.quartz.Job;
import org.quartz.JobDataMap; import org.quartz.JobDataMap;
@ -44,18 +45,8 @@ public class ProcessScheduleJob implements Job {
*/ */
private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class); private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class);
/** public ProcessService getProcessService(){
* process service return SpringApplicationContext.getBean(ProcessService.class);
*/
private static ProcessService processService;
/**
* init
* @param processService process dao
*/
public static void init(ProcessService processService) {
ProcessScheduleJob.processService = processService;
} }
/** /**
@ -67,7 +58,7 @@ public class ProcessScheduleJob implements Job {
@Override @Override
public void execute(JobExecutionContext context) throws JobExecutionException { public void execute(JobExecutionContext context) throws JobExecutionException {
Assert.notNull(processService, "please call init() method first"); Assert.notNull(getProcessService(), "please call init() method first");
JobDataMap dataMap = context.getJobDetail().getJobDataMap(); JobDataMap dataMap = context.getJobDetail().getJobDataMap();
@ -83,7 +74,7 @@ public class ProcessScheduleJob implements Job {
logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId); logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId);
// query schedule // query schedule
Schedule schedule = processService.querySchedule(scheduleId); Schedule schedule = getProcessService().querySchedule(scheduleId);
if (schedule == null) { if (schedule == null) {
logger.warn("process schedule does not exist in db,delete schedule job in quartz, projectId:{}, scheduleId:{}", projectId, scheduleId); logger.warn("process schedule does not exist in db,delete schedule job in quartz, projectId:{}, scheduleId:{}", projectId, scheduleId);
deleteJob(projectId, scheduleId); deleteJob(projectId, scheduleId);
@ -91,7 +82,7 @@ public class ProcessScheduleJob implements Job {
} }
ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId()); ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
// release state : online/offline // release state : online/offline
ReleaseState releaseState = processDefinition.getReleaseState(); ReleaseState releaseState = processDefinition.getReleaseState();
if (processDefinition == null || releaseState == ReleaseState.OFFLINE) { if (processDefinition == null || releaseState == ReleaseState.OFFLINE) {
@ -111,7 +102,7 @@ public class ProcessScheduleJob implements Job {
command.setWarningType(schedule.getWarningType()); command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority()); command.setProcessInstancePriority(schedule.getProcessInstancePriority());
processService.createCommand(command); getProcessService().createCommand(command);
} }

69
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -16,19 +16,15 @@
*/ */
package org.apache.dolphinscheduler.service.zk; package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.*; import java.util.*;
@ -37,15 +33,11 @@ import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* abstract zookeeper client * abstract zookeeper client
*/ */
@Component
public abstract class AbstractZKClient extends ZookeeperCachedOperator { public abstract class AbstractZKClient extends ZookeeperCachedOperator {
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
/**
* server stop or not
*/
protected IStoppable stoppable = null;
/** /**
* check dead server or not , if dead, stop self * check dead server or not , if dead, stop self
* *
@ -65,8 +57,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
if(!isExisted(zNode) || isExisted(deadServerPath)){ if(!isExisted(zNode) || isExisted(deadServerPath)){
return true; return true;
} }
return false; return false;
} }
@ -99,28 +89,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
return registerPath; return registerPath;
} }
/**
* register server, if server already exists, return null.
* @param zkNodeType zookeeper node type
* @return register server path in zookeeper
* @throws Exception errors
*/
public String registerServer(ZKNodeType zkNodeType) throws Exception {
String registerPath = null;
String host = OSUtils.getHost();
if(checkZKNodeExists(host, zkNodeType)){
logger.error("register failure , {} server already started on host : {}" ,
zkNodeType.toString(), host);
return registerPath;
}
registerPath = createZNodePath(zkNodeType, host);
// handle dead server
handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
return registerPath;
}
/** /**
* opType(add): if find dead server , then add to zk deadServerPath * opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk * opType(delete): delete path from zk
@ -152,16 +120,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
} }
/**
* for stop server
* @param serverStoppable server stoppable interface
*/
public void setStoppable(IStoppable serverStoppable){
this.stoppable = serverStoppable;
}
/** /**
* get active master num * get active master num
* @return active master number * @return active master number
@ -275,14 +233,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS; return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
} }
/**
*
* @return get master lock path
*/
public String getWorkerLockPath(){
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS;
}
/** /**
* *
* @param zkNodeType zookeeper node type * @param zkNodeType zookeeper node type
@ -339,7 +289,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
* release mutex * release mutex
* @param mutex mutex * @param mutex mutex
*/ */
public static void releaseMutex(InterProcessMutex mutex) { public void releaseMutex(InterProcessMutex mutex) {
if (mutex != null){ if (mutex != null){
try { try {
mutex.release(); mutex.release();
@ -387,18 +337,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
return pathArray[pathArray.length - 1]; return pathArray[pathArray.length - 1];
} }
/**
* acquire zk lock
* @param zkClient zk client
* @param zNodeLockPath zk lock path
* @return zk lock
* @throws Exception errors
*/
public InterProcessMutex acquireZkLock(CuratorFramework zkClient,String zNodeLockPath)throws Exception{
InterProcessMutex mutex = new InterProcessMutex(zkClient, zNodeLockPath);
mutex.acquire();
return mutex;
}
@Override @Override
public String toString() { public String toString() {
@ -407,7 +345,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' + ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' + ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' + ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
", stoppable=" + stoppable +
'}'; '}';
} }
} }

Loading…
Cancel
Save