Browse Source

change get alertDao by spring to DaoFactory (#996)

* rename from DatasourceUserMapper to DataSourceUserMapper

* add unit test in UserMapper and WorkerGroupMapper

* change cn.escheduler to org.apache.dolphinscheduler

* add unit test in UdfFuncMapperTest

* add unit test in UdfFuncMapperTest

* remove DatabaseConfiguration

* add ConnectionFactoryTest

* cal duration in processInstancesList

* change desc to description

* change table name in mysql ddl

* change table name in mysql ddl

* change escheduler to dolphinscheduler

* change escheduler to dolphinscheduler

* change escheduler to dolphinscheduler

* remove log4j-1.2-api and modify AlertMapperTest

* remove log4j-1.2-api

* Add alertDao to spring management

* Add alertDao to spring management

* get SqlSessionFactory from MybatisSqlSessionFactoryBean

* get processDao by DaoFactory

* read druid properties in ConneciontFactory

* read druid properties in ConneciontFactory

* change get alertDao by spring to DaoFactory
pull/2/head
lgcareer 5 years ago committed by qiaozhanwei
parent
commit
91229993c7
  1. 22
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  2. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java
  3. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  4. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  5. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

22
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -20,27 +20,22 @@ import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.alert.utils.Constants; import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.ComponentScan;
import java.util.List; import java.util.List;
/** /**
* alert of start * alert of start
*/ */
@ComponentScan("org.apache.dolphinscheduler") public class AlertServer {
public class AlertServer implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(AlertServer.class); private static final Logger logger = LoggerFactory.getLogger(AlertServer.class);
/** /**
* Alert Dao * Alert Dao
*/ */
@Autowired private AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
private AlertDao alertDao;
private AlertSender alertSender; private AlertSender alertSender;
@ -61,7 +56,7 @@ public class AlertServer implements CommandLineRunner {
return instance; return instance;
} }
public void start(AlertDao alertDao){ public void start(){
logger.info("Alert Server ready start!"); logger.info("Alert Server ready start!");
while (Stopper.isRunning()){ while (Stopper.isRunning()){
try { try {
@ -77,13 +72,8 @@ public class AlertServer implements CommandLineRunner {
public static void main(String[] args){ public static void main(String[] args){
SpringApplication app = new SpringApplication(AlertServer.class);
app.run(args);
}
@Override
public void run(String... strings) throws Exception {
AlertServer alertServer = AlertServer.getInstance(); AlertServer alertServer = AlertServer.getInstance();
alertServer.start(alertDao); alertServer.start();
} }
} }

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java

@ -42,16 +42,16 @@ public class CombinedApplicationServer extends SpringBootServletInitializer {
ProcessDao processDao = context.getBean(ProcessDao.class); ProcessDao processDao = context.getBean(ProcessDao.class);
AlertDao alertDao = context.getBean(AlertDao.class); AlertDao alertDao = context.getBean(AlertDao.class);
MasterServer master = new MasterServer(processDao,alertDao); MasterServer master = new MasterServer(processDao);
master.run(processDao); master.run(processDao);
WorkerServer workerServer = new WorkerServer(processDao, alertDao); WorkerServer workerServer = new WorkerServer(processDao);
workerServer.run(processDao, alertDao); workerServer.run(processDao);
LoggerServer server = new LoggerServer(); LoggerServer server = new LoggerServer();
server.start(); server.start();
AlertServer alertServer = AlertServer.getInstance(); AlertServer alertServer = AlertServer.getInstance();
alertServer.start(alertDao); alertServer.start();
} }
} }

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -16,20 +16,19 @@
*/ */
package org.apache.dolphinscheduler.server.master; package org.apache.dolphinscheduler.server.master;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob; import org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.server.quartz.QuartzExecutors; import org.apache.dolphinscheduler.server.quartz.QuartzExecutors;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.quartz.SchedulerException; import org.quartz.SchedulerException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -65,9 +64,6 @@ public class MasterServer extends AbstractServer {
@Autowired @Autowired
protected ProcessDao processDao; protected ProcessDao processDao;
@Autowired
protected AlertDao alertDao;
/** /**
* master exec thread pool * master exec thread pool
*/ */
@ -75,14 +71,14 @@ public class MasterServer extends AbstractServer {
public MasterServer(){} public MasterServer(){}
public MasterServer(ProcessDao processDao,AlertDao alertDao){ public MasterServer(ProcessDao processDao){
try { try {
conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH); conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);
}catch (ConfigurationException e){ }catch (ConfigurationException e){
logger.error("load configuration failed : " + e.getMessage(),e); logger.error("load configuration failed : " + e.getMessage(),e);
System.exit(1); System.exit(1);
} }
zkMasterClient = ZKMasterClient.getZKMasterClient(processDao,alertDao); zkMasterClient = ZKMasterClient.getZKMasterClient(processDao);
this.masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); this.masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
} }
@ -102,7 +98,7 @@ public class MasterServer extends AbstractServer {
@Override @Override
public void run(String... strings) throws Exception { public void run(String... strings) throws Exception {
MasterServer masterServer = new MasterServer(processDao,alertDao); MasterServer masterServer = new MasterServer(processDao);
masterServer.run(processDao); masterServer.run(processDao);

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -16,6 +16,9 @@
*/ */
package org.apache.dolphinscheduler.server.worker; package org.apache.dolphinscheduler.server.worker;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
@ -27,15 +30,13 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.AbstractServer; import org.apache.dolphinscheduler.server.master.AbstractServer;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread; import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient; import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -70,8 +71,7 @@ public class WorkerServer extends AbstractServer {
/** /**
* alert database access * alert database access
*/ */
@Autowired private final AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);;
private AlertDao alertDao;
/** /**
* heartbeat thread pool * heartbeat thread pool
@ -93,9 +93,10 @@ public class WorkerServer extends AbstractServer {
*/ */
private ExecutorService fetchTaskExecutorService; private ExecutorService fetchTaskExecutorService;
public WorkerServer(){} public WorkerServer(){
}
public WorkerServer(ProcessDao processDao, AlertDao alertDao){ public WorkerServer(ProcessDao processDao){
try { try {
conf = new PropertiesConfiguration(Constants.WORKER_PROPERTIES_PATH); conf = new PropertiesConfiguration(Constants.WORKER_PROPERTIES_PATH);
}catch (ConfigurationException e){ }catch (ConfigurationException e){
@ -131,9 +132,9 @@ public class WorkerServer extends AbstractServer {
// set the name of the current thread // set the name of the current thread
Thread.currentThread().setName("Worker-Main-Thread"); Thread.currentThread().setName("Worker-Main-Thread");
WorkerServer workerServer = new WorkerServer(processDao,alertDao); WorkerServer workerServer = new WorkerServer(processDao);
workerServer.run(processDao,alertDao); workerServer.run(processDao);
logger.info("worker server started"); logger.info("worker server started");
@ -142,7 +143,7 @@ public class WorkerServer extends AbstractServer {
} }
public void run(ProcessDao processDao, AlertDao alertDao){ public void run(ProcessDao processDao){
// heartbeat interval // heartbeat interval
heartBeatInterval = conf.getInt(Constants.WORKER_HEARTBEAT_INTERVAL, heartBeatInterval = conf.getInt(Constants.WORKER_HEARTBEAT_INTERVAL,

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -85,12 +85,11 @@ public class ZKMasterClient extends AbstractZKClient {
* @param processDao * @param processDao
* @return * @return
*/ */
public static synchronized ZKMasterClient getZKMasterClient(ProcessDao processDao,AlertDao alertDao){ public static synchronized ZKMasterClient getZKMasterClient(ProcessDao processDao){
if(zkMasterClient == null){ if(zkMasterClient == null){
zkMasterClient = new ZKMasterClient(processDao); zkMasterClient = new ZKMasterClient(processDao);
} }
zkMasterClient.processDao = processDao; zkMasterClient.processDao = processDao;
zkMasterClient.alertDao = alertDao;
return zkMasterClient; return zkMasterClient;
} }
@ -99,6 +98,8 @@ public class ZKMasterClient extends AbstractZKClient {
* init * init
*/ */
public void init(){ public void init(){
// init dao
this.initDao();
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
@ -134,6 +135,14 @@ public class ZKMasterClient extends AbstractZKClient {
/**
* init dao
*/
public void initDao(){
this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
// this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
/** /**
* get alert dao * get alert dao
* @return * @return

Loading…
Cancel
Save