From 1a4b28865f71b8666f2dd46c3d15560a255e2b0b Mon Sep 17 00:00:00 2001 From: Tboy Date: Fri, 15 Nov 2019 18:39:32 +0800 Subject: [PATCH] refactor masterServer and workerServer (#1244) * move updateTaskState into try/catch block in case of exception * fix NPE * using conf.getInt instead of getString * for AbstractZKClient, remove the log, for it will print the same log message in createZNodePath. for AlertDao, correct the spelling. * duplicate * refactor getTaskWorkerGroupId * add friendly log * update hearbeat thread num = 1 * fix the bug when worker execute task using queue. and remove checking Tenant user anymore in TaskScheduleThread * 1. move verifyTaskInstanceIsNull after taskInstance 2. keep verifyTenantIsNull/verifyTaskInstanceIsNull clean and readable * fix the message * delete before check to avoid KeeperException$NoNodeException * fix the message * check processInstance state before delete tenant * check processInstance state before delete worker group * refactor * merge api constants into common constatns * update the resource perm * update the dataSource perm * fix CheckUtils.checkUserParams method * update AlertGroupService, extends from BaseService, remove duplicate methods * refactor * modify method name * add hasProjectAndPerm method * using checkProject instead of getResultStatus * delete checkAuth method, using hasProjectAndPerm instead. * correct spelling * add transactional for deleteWorkerGroupById * add Transactional for deleteProcessInstanceById method * change sqlSessionTemplate singleton * change sqlSessionTemplate singleton and reformat code * fix unsuitable error message * update shutdownhook methods * fix worker log bug * fix api server debug mode bug * upgrade zk version * delete this line ,for zkClient.close() will do the whole thing * fix master server shutdown error * degrade zk version and add FourLetterWordMain class * fix PathChildrenCache not close * add Transactional for createSession method * add more message for java-doc * delete App, let spring manage connectionFactory * add license * add class Application for test support * refactor masterServer and workerServer * add args --- .../api/CombinedApplicationServer.java | 14 +-- .../server/master/AbstractServer.java | 37 +----- .../server/master/MasterServer.java | 57 +++------ ...ion.java => SpringApplicationContext.java} | 4 +- .../server/worker/WorkerServer.java | 112 +++++++++--------- .../server/worker/task/AbstractYarnTask.java | 4 +- .../task/dependent/DependentExecute.java | 4 +- .../worker/task/dependent/DependentTask.java | 4 +- .../server/worker/task/http/HttpTask.java | 4 +- .../task/processdure/ProcedureTask.java | 4 +- .../server/worker/task/python/PythonTask.java | 4 +- .../server/worker/task/shell/ShellTask.java | 4 +- .../server/worker/task/sql/SqlTask.java | 6 +- .../shell/ShellCommandExecutorTest.java | 4 +- .../server/worker/sql/SqlExecutorTest.java | 4 +- 15 files changed, 98 insertions(+), 168 deletions(-) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/{SpringApplication.java => SpringApplicationContext.java} (90%) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java index 3bb1a46724..1c84829755 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java @@ -17,16 +17,12 @@ package org.apache.dolphinscheduler.api; import org.apache.dolphinscheduler.alert.AlertServer; -import org.apache.dolphinscheduler.dao.AlertDao; -import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.server.master.MasterServer; import org.apache.dolphinscheduler.server.rpc.LoggerServer; import org.apache.dolphinscheduler.server.worker.WorkerServer; -import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; -import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.ComponentScan; import springfox.documentation.swagger2.annotations.EnableSwagger2; @@ -38,15 +34,11 @@ public class CombinedApplicationServer extends SpringBootServletInitializer { public static void main(String[] args) throws Exception { - ConfigurableApplicationContext context = SpringApplication.run(ApiApplicationServer.class, args); - ProcessDao processDao = context.getBean(ProcessDao.class); - AlertDao alertDao = context.getBean(AlertDao.class); + ApiApplicationServer.main(args); - MasterServer master = new MasterServer(processDao); - master.run(processDao); + MasterServer.main(args); - WorkerServer workerServer = new WorkerServer(processDao); - workerServer.run(processDao); + WorkerServer.main(args); LoggerServer server = new LoggerServer(); server.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/AbstractServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/AbstractServer.java index 41cd993846..6ab677dfb1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/AbstractServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/AbstractServer.java @@ -26,54 +26,19 @@ import org.springframework.context.annotation.ComponentScan; /** * master server */ -@ComponentScan("org.apache.dolphinscheduler") -public abstract class AbstractServer implements CommandLineRunner, IStoppable { +public abstract class AbstractServer implements IStoppable { - /** - * logger of AbstractServer - */ - private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class); /** * abstract server onfiguration */ protected static Configuration conf; - /** - * object lock - */ - protected final Object lock = new Object(); - - /** - * whether or not to close the state - */ - protected boolean terminated = false; - /** * heartbeat interval, unit second */ protected int heartBeatInterval; - /** - * blocking implement - * @throws InterruptedException reasonInter - */ - public void awaitTermination() throws InterruptedException { - synchronized (lock) { - while (!terminated) { - lock.wait(); - } - } - } - - /** - * Callback used to run the bean. - * @param args incoming main method arguments - * @throws Exception on error - */ - @Override - public abstract void run(String... args) throws Exception; - /** * gracefully stop * @param cause why stopping diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 95c2054d95..b3710bc6b5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -36,6 +36,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.context.annotation.ComponentScan; +import javax.annotation.PostConstruct; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -54,7 +55,7 @@ public class MasterServer extends AbstractServer { /** * zk master client */ - private static ZKMasterClient zkMasterClient = null; + private ZKMasterClient zkMasterClient = null; /** * heartbeat thread pool @@ -72,25 +73,6 @@ public class MasterServer extends AbstractServer { */ private ExecutorService masterSchedulerService; - /** - * default constructor - */ - public MasterServer(){} - - /** - * constructor of MasterServers - * @param processDao process dao - */ - public MasterServer(ProcessDao processDao){ - try { - conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH); - }catch (ConfigurationException e){ - logger.error("load configuration failed : " + e.getMessage(),e); - System.exit(1); - } - zkMasterClient = ZKMasterClient.getZKMasterClient(processDao); - this.masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); - } /** * master server startup @@ -99,29 +81,26 @@ public class MasterServer extends AbstractServer { * @param args arguments */ public static void main(String[] args) { - SpringApplication app = new SpringApplication(MasterServer.class); + SpringApplication.run(MasterServer.class, args); - app.run(args); - } - - - @Override - public void run(String... strings) throws Exception { - - MasterServer masterServer = new MasterServer(processDao); - - masterServer.run(processDao); - - logger.info("master server started"); - // blocking - masterServer.awaitTermination(); } /** * run master server - * @param processDao process dao */ - public void run(ProcessDao processDao){ + @PostConstruct + public void run(){ + + try { + conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH); + }catch (ConfigurationException e){ + logger.error("load configuration failed : " + e.getMessage(),e); + System.exit(1); + } + + masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); + + zkMasterClient = ZKMasterClient.getZKMasterClient(processDao); // heartbeat interval heartBeatInterval = conf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL, @@ -251,10 +230,6 @@ public class MasterServer extends AbstractServer { logger.info("zookeeper service stopped"); - synchronized (lock) { - terminated = true; - lock.notifyAll(); - } } catch (Exception e) { logger.error("master server stop exception : " + e.getMessage(), e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SpringApplication.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SpringApplicationContext.java similarity index 90% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SpringApplication.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SpringApplicationContext.java index 1b22be9c5b..96087e5a52 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SpringApplication.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SpringApplicationContext.java @@ -23,13 +23,13 @@ import org.springframework.stereotype.Component; @Component -public class SpringApplication implements ApplicationContextAware { +public class SpringApplicationContext implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - SpringApplication.applicationContext = applicationContext; + SpringApplicationContext.applicationContext = applicationContext; } public static T getBean(Class requiredType){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 770da380e4..4d52e5f259 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.AbstractServer; import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread; import org.apache.dolphinscheduler.server.zk.ZKWorkerClient; import org.slf4j.Logger; @@ -44,7 +45,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.context.annotation.ComponentScan; +import javax.annotation.PostConstruct; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -64,7 +67,8 @@ public class WorkerServer extends AbstractServer { /** * zk worker client */ - private static ZKWorkerClient zkWorkerClient = null; + private ZKWorkerClient zkWorkerClient = null; + /** * process database access @@ -81,7 +85,7 @@ public class WorkerServer extends AbstractServer { /** * heartbeat thread pool */ - private ScheduledExecutorService heartbeatWorerService; + private ScheduledExecutorService heartbeatWorkerService; /** * task queue impl @@ -98,25 +102,17 @@ public class WorkerServer extends AbstractServer { */ private ExecutorService fetchTaskExecutorService; - public WorkerServer(){ - } - - public WorkerServer(ProcessDao processDao){ - try { - conf = new PropertiesConfiguration(Constants.WORKER_PROPERTIES_PATH); - }catch (ConfigurationException e){ - logger.error("load configuration failed",e); - System.exit(1); - } - - zkWorkerClient = ZKWorkerClient.getZKWorkerClient(); - - this.taskQueue = TaskQueueFactory.getTaskQueueInstance(); - - this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor"); + /** + * spring application context + * only use it for initialization + */ + @Autowired + private SpringApplicationContext springApplicationContext; - this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); - } + /** + * CountDownLatch latch + */ + private CountDownLatch latch; /** * master server startup @@ -125,38 +121,36 @@ public class WorkerServer extends AbstractServer { * @param args arguments */ public static void main(String[] args) { - SpringApplication app = new SpringApplication(WorkerServer.class); - - app.run(args); + SpringApplication.run(WorkerServer.class,args); } - @Override - public void run(String... args) throws Exception { - // set the name of the current thread - Thread.currentThread().setName("Worker-Main-Thread"); + /** + * worker server run + */ + @PostConstruct + public void run(){ - WorkerServer workerServer = new WorkerServer(processDao); + try { + conf = new PropertiesConfiguration(Constants.WORKER_PROPERTIES_PATH); + }catch (ConfigurationException e){ + logger.error("load configuration failed",e); + System.exit(1); + } - workerServer.run(processDao); + zkWorkerClient = ZKWorkerClient.getZKWorkerClient(); - logger.info("worker server started"); + this.taskQueue = TaskQueueFactory.getTaskQueueInstance(); - // blocking - workerServer.awaitTermination(); - } + this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor"); - /** - * worker server run - * @param processDao process dao - */ - public void run(ProcessDao processDao){ + this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); // heartbeat interval heartBeatInterval = conf.getInt(Constants.WORKER_HEARTBEAT_INTERVAL, Constants.defaultWorkerHeartbeatInterval); - heartbeatWorerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.defaulWorkerHeartbeatThreadNum); + heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.defaulWorkerHeartbeatThreadNum); // heartbeat thread implement Runnable heartBeatThread = heartBeatThread(); @@ -165,15 +159,25 @@ public class WorkerServer extends AbstractServer { // regular heartbeat // delay 5 seconds, send heartbeat every 30 seconds - heartbeatWorerService. - scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS); + heartbeatWorkerService.scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS); // kill process thread implement - Runnable killProcessThread = getKillProcessThread(processDao); + Runnable killProcessThread = getKillProcessThread(); // submit kill process thread killExecutorService.execute(killProcessThread); + + + // get worker number of concurrent tasks + int taskNum = conf.getInt(Constants.WORKER_FETCH_TASK_NUM,Constants.defaultWorkerFetchTaskNum); + + // new fetch task thread + FetchTaskThread fetchTaskThread = new FetchTaskThread(taskNum,zkWorkerClient, processDao,conf, taskQueue); + + // submit fetch task thread + fetchTaskExecutorService.execute(fetchTaskThread); + /** * register hooks, which are called before the process exits */ @@ -190,14 +194,12 @@ public class WorkerServer extends AbstractServer { } })); - // get worker number of concurrent tasks - int taskNum = conf.getInt(Constants.WORKER_FETCH_TASK_NUM,Constants.defaultWorkerFetchTaskNum); - - // new fetch task thread - FetchTaskThread fetchTaskThread = new FetchTaskThread(taskNum,zkWorkerClient, processDao,conf, taskQueue); - - // submit fetch task thread - fetchTaskExecutorService.execute(fetchTaskThread); + //let the main thread await + latch = new CountDownLatch(1); + try { + latch.await(); + } catch (InterruptedException ignore) { + } } @Override @@ -222,7 +224,7 @@ public class WorkerServer extends AbstractServer { } try { - heartbeatWorerService.shutdownNow(); + heartbeatWorkerService.shutdownNow(); }catch (Exception e){ logger.warn("heartbeat service stopped exception"); } @@ -255,13 +257,9 @@ public class WorkerServer extends AbstractServer { }catch (Exception e){ logger.warn("zookeeper service stopped exception:{}",e.getMessage()); } + latch.countDown(); logger.info("zookeeper service stopped"); - //notify - synchronized (lock) { - terminated = true; - lock.notifyAll(); - } } catch (Exception e) { logger.error("worker server stop exception : " + e.getMessage(), e); System.exit(-1); @@ -295,7 +293,7 @@ public class WorkerServer extends AbstractServer { * * @return kill process thread */ - private Runnable getKillProcessThread(ProcessDao processDao){ + private Runnable getKillProcessThread(){ Runnable killProcessThread = new Runnable() { @Override public void run() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java index 1bdee3a6bf..776290aafa 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.utils.ProcessUtils; -import org.apache.dolphinscheduler.server.utils.SpringApplication; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.slf4j.Logger; /** @@ -48,7 +48,7 @@ public abstract class AbstractYarnTask extends AbstractTask { */ public AbstractYarnTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); - this.processDao = SpringApplication.getBean(ProcessDao.class); + this.processDao = SpringApplicationContext.getBean(ProcessDao.class); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(), taskProps.getTaskAppId(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java index f796dc0115..5dc25b8935 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java @@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.utils.SpringApplication; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +39,7 @@ public class DependentExecute { /** * process dao */ - private static final ProcessDao processDao = SpringApplication.getBean(ProcessDao.class); + private final ProcessDao processDao = SpringApplicationContext.getBean(ProcessDao.class); /** * depend item list diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java index 3aab8b3027..30c5a1a5d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java @@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.utils.SpringApplication; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.slf4j.Logger; @@ -88,7 +88,7 @@ public class DependentTask extends AbstractTask { taskModel.getDependItemList(), taskModel.getRelation())); } - this.processDao = SpringApplication.getBean(ProcessDao.class); + this.processDao = SpringApplicationContext.getBean(ProcessDao.class); if(taskProps.getScheduleTime() != null){ this.dependentDate = taskProps.getScheduleTime(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index f4132a5bf9..a5c7390499 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -33,7 +33,7 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.server.utils.ParamUtils; -import org.apache.dolphinscheduler.server.utils.SpringApplication; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.http.HttpEntity; @@ -92,7 +92,7 @@ public class HttpTask extends AbstractTask { */ public HttpTask(TaskProps props, Logger logger) { super(props, logger); - this.processDao = SpringApplication.getBean(ProcessDao.class); + this.processDao = SpringApplicationContext.getBean(ProcessDao.class); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java index a276a672e9..3418c741f8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java @@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.server.utils.ParamUtils; -import org.apache.dolphinscheduler.server.utils.SpringApplication; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.slf4j.Logger; @@ -82,7 +82,7 @@ public class ProcedureTask extends AbstractTask { throw new RuntimeException("procedure task params is not valid"); } - this.processDao = SpringApplication.getBean(ProcessDao.class); + this.processDao = SpringApplicationContext.getBean(ProcessDao.class); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index 601e8399b7..fffd5f080d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.server.utils.ParamUtils; -import org.apache.dolphinscheduler.server.utils.SpringApplication; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.TaskProps; @@ -76,7 +76,7 @@ public class PythonTask extends AbstractTask { taskProps.getTaskStartTime(), taskProps.getTaskTimeout(), logger); - this.processDao = SpringApplication.getBean(ProcessDao.class); + this.processDao = SpringApplicationContext.getBean(ProcessDao.class); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 2a1ccdb999..fb7d2268e5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.server.utils.ParamUtils; -import org.apache.dolphinscheduler.server.utils.SpringApplication; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.TaskProps; @@ -84,7 +84,7 @@ public class ShellTask extends AbstractTask { taskProps.getTaskStartTime(), taskProps.getTaskTimeout(), logger); - this.processDao = SpringApplication.getBean(ProcessDao.class); + this.processDao = SpringApplicationContext.getBean(ProcessDao.class); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index b592178b59..68a63ea9af 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -43,7 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.server.utils.ParamUtils; -import org.apache.dolphinscheduler.server.utils.SpringApplication; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.apache.dolphinscheduler.server.utils.UDFUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; @@ -97,8 +97,8 @@ public class SqlTask extends AbstractTask { if (!sqlParameters.checkParameters()) { throw new RuntimeException("sql task params is not valid"); } - this.processDao = SpringApplication.getBean(ProcessDao.class); - this.alertDao = SpringApplication.getBean(AlertDao.class); + this.processDao = SpringApplicationContext.getBean(ProcessDao.class); + this.alertDao = SpringApplicationContext.getBean(AlertDao.class); } @Override diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java index e76ad1ba8c..71bebe2990 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.utils.LoggerUtils; -import org.apache.dolphinscheduler.server.utils.SpringApplication; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; @@ -47,7 +47,7 @@ public class ShellCommandExecutorTest { @Before public void before(){ - processDao = SpringApplication.getBean(ProcessDao.class); + processDao = SpringApplicationContext.getBean(ProcessDao.class); } @Test diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java index 5087a45151..725f2835e9 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.utils.LoggerUtils; -import org.apache.dolphinscheduler.server.utils.SpringApplication; +import org.apache.dolphinscheduler.server.utils.SpringApplicationContext; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; @@ -48,7 +48,7 @@ public class SqlExecutorTest { @Before public void before(){ - processDao = SpringApplication.getBean(ProcessDao.class); + processDao = SpringApplicationContext.getBean(ProcessDao.class); } @Test