diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index 0d77951633..9ac76d2838 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -20,27 +20,22 @@ import org.apache.dolphinscheduler.alert.runner.AlertSender; import org.apache.dolphinscheduler.alert.utils.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.DaoFactory; import org.apache.dolphinscheduler.dao.entity.Alert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; -import org.springframework.boot.SpringApplication; -import org.springframework.context.annotation.ComponentScan; import java.util.List; /** * alert of start */ -@ComponentScan("org.apache.dolphinscheduler") -public class AlertServer implements CommandLineRunner { +public class AlertServer { private static final Logger logger = LoggerFactory.getLogger(AlertServer.class); /** * Alert Dao */ - @Autowired - private AlertDao alertDao; + private AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class); private AlertSender alertSender; @@ -61,7 +56,7 @@ public class AlertServer implements CommandLineRunner { return instance; } - public void start(AlertDao alertDao){ + public void start(){ logger.info("Alert Server ready start!"); while (Stopper.isRunning()){ try { @@ -77,13 +72,8 @@ public class AlertServer implements CommandLineRunner { public static void main(String[] args){ - SpringApplication app = new SpringApplication(AlertServer.class); - app.run(args); - } - - @Override - public void run(String... strings) throws Exception { AlertServer alertServer = AlertServer.getInstance(); - alertServer.start(alertDao); + alertServer.start(); } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java index 4a9f681ab9..3bb1a46724 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java @@ -42,16 +42,16 @@ public class CombinedApplicationServer extends SpringBootServletInitializer { ProcessDao processDao = context.getBean(ProcessDao.class); AlertDao alertDao = context.getBean(AlertDao.class); - MasterServer master = new MasterServer(processDao,alertDao); + MasterServer master = new MasterServer(processDao); master.run(processDao); - WorkerServer workerServer = new WorkerServer(processDao, alertDao); - workerServer.run(processDao, alertDao); + WorkerServer workerServer = new WorkerServer(processDao); + workerServer.run(processDao); LoggerServer server = new LoggerServer(); server.start(); AlertServer alertServer = AlertServer.getInstance(); - alertServer.start(alertDao); + alertServer.start(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index c1d1a62674..9ab9977ce9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -16,20 +16,19 @@ */ package org.apache.dolphinscheduler.server.master; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; import org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob; import org.apache.dolphinscheduler.server.quartz.QuartzExecutors; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang3.StringUtils; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,9 +64,6 @@ public class MasterServer extends AbstractServer { @Autowired protected ProcessDao processDao; - @Autowired - protected AlertDao alertDao; - /** * master exec thread pool */ @@ -75,14 +71,14 @@ public class MasterServer extends AbstractServer { public MasterServer(){} - public MasterServer(ProcessDao processDao,AlertDao alertDao){ + public MasterServer(ProcessDao processDao){ try { conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH); }catch (ConfigurationException e){ logger.error("load configuration failed : " + e.getMessage(),e); System.exit(1); } - zkMasterClient = ZKMasterClient.getZKMasterClient(processDao,alertDao); + zkMasterClient = ZKMasterClient.getZKMasterClient(processDao); this.masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); } @@ -102,7 +98,7 @@ public class MasterServer extends AbstractServer { @Override public void run(String... strings) throws Exception { - MasterServer masterServer = new MasterServer(processDao,alertDao); + MasterServer masterServer = new MasterServer(processDao); masterServer.run(processDao); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index b36af51846..a2acf9bb29 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -16,6 +16,9 @@ */ package org.apache.dolphinscheduler.server.worker; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -27,15 +30,13 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.DaoFactory; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.AbstractServer; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread; import org.apache.dolphinscheduler.server.zk.ZKWorkerClient; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -70,8 +71,7 @@ public class WorkerServer extends AbstractServer { /** * alert database access */ - @Autowired - private AlertDao alertDao; + private final AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);; /** * heartbeat thread pool @@ -93,9 +93,10 @@ public class WorkerServer extends AbstractServer { */ private ExecutorService fetchTaskExecutorService; - public WorkerServer(){} + public WorkerServer(){ + } - public WorkerServer(ProcessDao processDao, AlertDao alertDao){ + public WorkerServer(ProcessDao processDao){ try { conf = new PropertiesConfiguration(Constants.WORKER_PROPERTIES_PATH); }catch (ConfigurationException e){ @@ -131,9 +132,9 @@ public class WorkerServer extends AbstractServer { // set the name of the current thread Thread.currentThread().setName("Worker-Main-Thread"); - WorkerServer workerServer = new WorkerServer(processDao,alertDao); + WorkerServer workerServer = new WorkerServer(processDao); - workerServer.run(processDao,alertDao); + workerServer.run(processDao); logger.info("worker server started"); @@ -142,7 +143,7 @@ public class WorkerServer extends AbstractServer { } - public void run(ProcessDao processDao, AlertDao alertDao){ + public void run(ProcessDao processDao){ // heartbeat interval heartBeatInterval = conf.getInt(Constants.WORKER_HEARTBEAT_INTERVAL, diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 409d39a804..90a2679f5b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -85,12 +85,11 @@ public class ZKMasterClient extends AbstractZKClient { * @param processDao * @return */ - public static synchronized ZKMasterClient getZKMasterClient(ProcessDao processDao,AlertDao alertDao){ + public static synchronized ZKMasterClient getZKMasterClient(ProcessDao processDao){ if(zkMasterClient == null){ zkMasterClient = new ZKMasterClient(processDao); } zkMasterClient.processDao = processDao; - zkMasterClient.alertDao = alertDao; return zkMasterClient; } @@ -99,6 +98,8 @@ public class ZKMasterClient extends AbstractZKClient { * init */ public void init(){ + // init dao + this.initDao(); InterProcessMutex mutex = null; try { @@ -134,6 +135,14 @@ public class ZKMasterClient extends AbstractZKClient { + + /** + * init dao + */ + public void initDao(){ + this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); +// this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); + } /** * get alert dao * @return