From e0f53cfe576b7743f3cc9a3048df27f3d452aecb Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Sat, 12 Oct 2019 10:23:27 +0800 Subject: [PATCH] get processDao by DaoFactory (#994) * rename from DatasourceUserMapper to DataSourceUserMapper * add unit test in UserMapper and WorkerGroupMapper * change cn.escheduler to org.apache.dolphinscheduler * add unit test in UdfFuncMapperTest * add unit test in UdfFuncMapperTest * remove DatabaseConfiguration * add ConnectionFactoryTest * cal duration in processInstancesList * change desc to description * change table name in mysql ddl * change table name in mysql ddl * change escheduler to dolphinscheduler * change escheduler to dolphinscheduler * change escheduler to dolphinscheduler * remove log4j-1.2-api and modify AlertMapperTest * remove log4j-1.2-api * Add alertDao to spring management * Add alertDao to spring management * get SqlSessionFactory from MybatisSqlSessionFactoryBean * get processDao by DaoFactory * read druid properties in ConneciontFactory * read druid properties in ConneciontFactory --- .../dolphinscheduler/common/Constants.java | 4 +- .../dolphinscheduler/dao/MonitorDBDao.java | 2 +- .../dolphinscheduler/dao/ProcessDao.java | 16 +++++++ .../dolphinscheduler/dao/TaskRecordDao.java | 2 +- .../dao/datasource/ConnectionFactory.java | 48 ++++++++++++------- .../src/main/resources/application.properties | 41 ++++++++++++++-- .../worker/runner/TaskScheduleThread.java | 2 +- .../server/worker/task/AbstractYarnTask.java | 6 +-- .../server/worker/task/TaskManager.java | 21 ++++---- .../worker/task/dependent/DependentTask.java | 5 +- .../server/worker/task/flink/FlinkTask.java | 5 +- .../server/worker/task/http/HttpTask.java | 4 +- .../server/worker/task/mr/MapReduceTask.java | 5 +- .../task/processdure/ProcedureTask.java | 4 +- .../server/worker/task/python/PythonTask.java | 4 +- .../server/worker/task/shell/ShellTask.java | 4 +- .../server/worker/task/spark/SparkTask.java | 5 +- .../server/worker/task/sql/SqlTask.java | 8 ++-- .../shell/ShellCommandExecutorTest.java | 2 +- .../server/worker/sql/SqlExecutorTest.java | 7 +-- .../task/dependent/DependentTaskTest.java | 2 +- 21 files changed, 125 insertions(+), 72 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 060c79ad54..02c1e1c47b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -53,7 +53,7 @@ public final class Constants { /** * dao properties path */ - public static final String DAO_PROPERTIES_PATH = "/dao/data_source.properties"; + public static final String DAO_PROPERTIES_PATH = "application.properties"; /** * fs.defaultFS @@ -470,7 +470,7 @@ public final class Constants { /** * task record configuration path */ - public static final String DATA_SOURCE_PROPERTIES = "dao/data_source.properties"; + public static final String APPLICATION_PROPERTIES = "application.properties"; public static final String TASK_RECORD_URL = "task.record.datasource.url"; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/MonitorDBDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/MonitorDBDao.java index b9a99eec31..e33b64d007 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/MonitorDBDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/MonitorDBDao.java @@ -45,7 +45,7 @@ public class MonitorDBDao { static { try { - conf = new PropertiesConfiguration(Constants.DATA_SOURCE_PROPERTIES); + conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES); }catch (ConfigurationException e){ logger.error("load configuration excetpion",e); System.exit(1); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index 5f442d38f3..c6fa46307d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -46,6 +46,7 @@ import java.util.*; import java.util.stream.Collectors; import static org.apache.dolphinscheduler.common.Constants.*; +import static org.apache.dolphinscheduler.dao.datasource.ConnectionFactory.getMapper; /** * process relative dao that some mappers in this. @@ -118,6 +119,21 @@ public class ProcessDao extends AbstractBaseDao { @Override protected void init() { taskQueue = TaskQueueFactory.getTaskQueueInstance(); + + + userMapper = getMapper(UserMapper.class); + processDefineMapper = getMapper(ProcessDefinitionMapper.class); + processInstanceMapper = getMapper(ProcessInstanceMapper.class); + dataSourceMapper = getMapper(DataSourceMapper.class); + processInstanceMapMapper = getMapper(ProcessInstanceMapMapper.class); + taskInstanceMapper = getMapper(TaskInstanceMapper.class); + commandMapper = getMapper(CommandMapper.class); + scheduleMapper = getMapper(ScheduleMapper.class); + udfFuncMapper = getMapper(UdfFuncMapper.class); + resourceMapper = getMapper(ResourceMapper.class); + workerGroupMapper = getMapper(WorkerGroupMapper.class); + taskQueue = TaskQueueFactory.getTaskQueueInstance(); + tenantMapper = getMapper(TenantMapper.class); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java index 7981c41297..12463ed3e1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java @@ -49,7 +49,7 @@ public class TaskRecordDao { static { try { - conf = new PropertiesConfiguration(Constants.DATA_SOURCE_PROPERTIES); + conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES); }catch (ConfigurationException e){ logger.error("load configuration excetpion",e); System.exit(1); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java index 3dbb290c34..398181e210 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java @@ -30,7 +30,6 @@ import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory; import org.mybatis.spring.SqlSessionTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import javax.sql.DataSource; @@ -50,7 +49,7 @@ public class ConnectionFactory { static { try { - conf = new PropertiesConfiguration(Constants.DATA_SOURCE_PROPERTIES); + conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES); }catch (ConfigurationException e){ logger.error("load configuration excetpion",e); System.exit(1); @@ -61,19 +60,33 @@ public class ConnectionFactory { * get the data source */ public static DruidDataSource getDataSource() { + DruidDataSource druidDataSource = new DruidDataSource(); -// Map allMap = YmlConfig.allMap; - druidDataSource.setDriverClassName(conf.getString("spring.datasource.driver-class-name")); - druidDataSource.setUrl(conf.getString("spring.datasource.url")); - druidDataSource.setUsername(conf.getString("spring.datasource.username")); - druidDataSource.setPassword(conf.getString("spring.datasource.password")); - druidDataSource.setInitialSize(5); - druidDataSource.setMinIdle(5); - druidDataSource.setMaxActive(20); - druidDataSource.setMaxWait(60000); - druidDataSource.setTimeBetweenEvictionRunsMillis(60000); - druidDataSource.setMinEvictableIdleTimeMillis(300000); - druidDataSource.setValidationQuery("SELECT 1"); + + druidDataSource.setDriverClassName(conf.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME)); + druidDataSource.setUrl(conf.getString(Constants.SPRING_DATASOURCE_URL)); + druidDataSource.setUsername(conf.getString(Constants.SPRING_DATASOURCE_USERNAME)); + druidDataSource.setPassword(conf.getString(Constants.SPRING_DATASOURCE_PASSWORD)); + druidDataSource.setValidationQuery(conf.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY)); + + druidDataSource.setPoolPreparedStatements(conf.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS)); + druidDataSource.setTestWhileIdle(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE)); + druidDataSource.setTestOnBorrow(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW)); + druidDataSource.setTestOnReturn(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN)); + druidDataSource.setKeepAlive(conf.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE)); + + druidDataSource.setMinIdle(conf.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE)); + druidDataSource.setMaxActive(conf.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE)); + druidDataSource.setMaxWait(conf.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT)); + druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(conf.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE)); + druidDataSource.setInitialSize(conf.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE)); + druidDataSource.setTimeBetweenEvictionRunsMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS)); + druidDataSource.setTimeBetweenConnectErrorMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS)); + druidDataSource.setMinEvictableIdleTimeMillis(conf.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS)); + druidDataSource.setValidationQueryTimeout(conf.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT)); + //auto commit + druidDataSource.setDefaultAutoCommit(conf.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT)); + return druidDataSource; } @@ -95,11 +108,12 @@ public class ConnectionFactory { configuration.addMappers("org.apache.dolphinscheduler.dao.mapper"); MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); + sqlSessionFactoryBean.setConfiguration(configuration); sqlSessionFactoryBean.setDataSource(dataSource); + sqlSessionFactoryBean.setTypeEnumsPackage("org.apache.dolphinscheduler.*.enums"); - sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:org/apache/dolphinscheduler/dao/mapper/*.xml")); - sqlSessionFactoryBean.setConfiguration(configuration); - return sqlSessionFactoryBean.getObject(); + sqlSessionFactory = sqlSessionFactoryBean.getObject(); + return sqlSessionFactory; } } } diff --git a/dolphinscheduler-dao/src/main/resources/application.properties b/dolphinscheduler-dao/src/main/resources/application.properties index a34e1c796f..fa77098925 100644 --- a/dolphinscheduler-dao/src/main/resources/application.properties +++ b/dolphinscheduler-dao/src/main/resources/application.properties @@ -1,23 +1,54 @@ # mysql # url=jdbc:postgresql://192.168.220.154:5432/dolphinscheduler +# base spring data source configuration +spring.datasource.type=com.alibaba.druid.pool.DruidDataSource +#spring.datasource.driver-class-name=org.postgresql.Driver +spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://192.168.220.188:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 spring.datasource.username=root spring.datasource.password=root@123 -spring.datasource.type=com.alibaba.druid.pool.DruidDataSource -# driver-class-name=org.postgresql.Driver -spring.datasource.driver-class-name=com.mysql.jdbc.Driver + +# connection configuration spring.datasource.initialSize=5 +# min connection number spring.datasource.minIdle=5 -spring.datasource.maxActive=20 +# max connection number +spring.datasource.maxActive=50 + +# max wait time for get a connection in milliseconds. if configuring maxWait, fair locks are enabled by default and concurrency efficiency decreases. +# If necessary, unfair locks can be used by configuring the useUnfairLock attribute to true. spring.datasource.maxWait=60000 + +# milliseconds for check to close free connections spring.datasource.timeBetweenEvictionRunsMillis=60000 + +# the Destroy thread detects the connection interval and closes the physical connection in milliseconds if the connection idle time is greater than or equal to minEvictableIdleTimeMillis. +spring.datasource.timeBetweenConnectErrorMillis=60000 + +# the longest time a connection remains idle without being evicted, in milliseconds spring.datasource.minEvictableIdleTimeMillis=300000 + +#the SQL used to check whether the connection is valid requires a query statement. If validation Query is null, testOnBorrow, testOnReturn, and testWhileIdle will not work. spring.datasource.validationQuery=SELECT 1 FROM DUAL + +#check whether the connection is valid for timeout, in seconds +spring.datasource.validationQueryTimeout=3 + +# when applying for a connection, if it is detected that the connection is idle longer than time Between Eviction Runs Millis, +# validation Query is performed to check whether the connection is valid spring.datasource.testWhileIdle=true -spring.datasource.testOnBorrow=false + +#execute validation to check if the connection is valid when applying for a connection +spring.datasource.testOnBorrow=true +#execute validation to check if the connection is valid when the connection is returned spring.datasource.testOnReturn=false +spring.datasource.defaultAutoCommit=true +spring.datasource.keepAlive=true + +# open PSCache, specify count PSCache for every connection spring.datasource.poolPreparedStatements=true spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 + spring.datasource.spring.datasource.filters=stat,wall,log4j spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 66a09a6841..91da0b6d1c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -142,7 +142,7 @@ public class TaskScheduleThread implements Runnable { task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, - taskLogger,processDao); + taskLogger); // task init task.init(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java index c79c0b41cd..7af1af5318 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.slf4j.Logger; -import org.springframework.beans.factory.annotation.Autowired; import java.io.IOException; @@ -42,7 +41,6 @@ public abstract class AbstractYarnTask extends AbstractTask { /** * process database access */ - @Autowired protected ProcessDao processDao; /** @@ -50,9 +48,9 @@ public abstract class AbstractYarnTask extends AbstractTask { * @param logger * @throws IOException */ - public AbstractYarnTask(TaskProps taskProps, Logger logger,ProcessDao processDao) { + public AbstractYarnTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); - this.processDao = processDao; + this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(), taskProps.getTaskAppId(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index 6373eb4351..e308a906a6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; import org.apache.dolphinscheduler.server.worker.task.http.HttpTask; @@ -45,27 +44,27 @@ public class TaskManager { * @return * @throws IllegalArgumentException */ - public static AbstractTask newTask(String taskType, TaskProps props, Logger logger,ProcessDao processDao) + public static AbstractTask newTask(String taskType, TaskProps props, Logger logger) throws IllegalArgumentException { switch (EnumUtils.getEnum(TaskType.class,taskType)) { case SHELL: - return new ShellTask(props, logger,processDao); + return new ShellTask(props, logger); case PROCEDURE: - return new ProcedureTask(props, logger,processDao); + return new ProcedureTask(props, logger); case SQL: - return new SqlTask(props, logger,processDao); + return new SqlTask(props, logger); case MR: - return new MapReduceTask(props, logger,processDao); + return new MapReduceTask(props, logger); case SPARK: - return new SparkTask(props, logger,processDao); + return new SparkTask(props, logger); case FLINK: - return new FlinkTask(props, logger,processDao); + return new FlinkTask(props, logger); case PYTHON: - return new PythonTask(props, logger,processDao); + return new PythonTask(props, logger); case DEPENDENT: - return new DependentTask(props, logger,processDao); + return new DependentTask(props, logger); case HTTP: - return new HttpTask(props, logger,processDao); + return new HttpTask(props, logger); default: logger.error("unsupport task type: {}", taskType); throw new IllegalArgumentException("not support task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java index 642315f278..8510265869 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java @@ -52,9 +52,8 @@ public class DependentTask extends AbstractTask { private ProcessDao processDao; - public DependentTask(TaskProps props, Logger logger,ProcessDao processDao) { + public DependentTask(TaskProps props, Logger logger) { super(props, logger); - this.processDao = processDao; } @Override @@ -69,7 +68,7 @@ public class DependentTask extends AbstractTask { taskModel.getDependItemList(), taskModel.getRelation())); } - + this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); if(taskProps.getScheduleTime() != null){ this.dependentDate = taskProps.getScheduleTime(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index 52991ccefd..de50c52ed6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; @@ -50,8 +49,8 @@ public class FlinkTask extends AbstractYarnTask { */ private FlinkParameters flinkParameters; - public FlinkTask(TaskProps props, Logger logger,ProcessDao processDao) { - super(props, logger,processDao); + public FlinkTask(TaskProps props, Logger logger) { + super(props, logger); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index 801c0362a7..47f6f83158 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -75,9 +75,9 @@ public class HttpTask extends AbstractTask { protected String output; - public HttpTask(TaskProps props, Logger logger,ProcessDao processDao) { + public HttpTask(TaskProps props, Logger logger) { super(props, logger); - this.processDao = processDao; + this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index d6134d729a..ec61643523 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; @@ -49,8 +48,8 @@ public class MapReduceTask extends AbstractYarnTask { * @param props * @param logger */ - public MapReduceTask(TaskProps props, Logger logger,ProcessDao processDao) { - super(props, logger,processDao); + public MapReduceTask(TaskProps props, Logger logger) { + super(props, logger); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java index 4f6740940f..7a6aaac289 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java @@ -64,7 +64,7 @@ public class ProcedureTask extends AbstractTask { */ private BaseDataSource baseDataSource; - public ProcedureTask(TaskProps taskProps, Logger logger,ProcessDao processDao) { + public ProcedureTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); logger.info("procedure task params {}", taskProps.getTaskParams()); @@ -76,7 +76,7 @@ public class ProcedureTask extends AbstractTask { throw new RuntimeException("procedure task params is not valid"); } - this.processDao = processDao; + this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index a219d37c1c..8a9903b09f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -59,7 +59,7 @@ public class PythonTask extends AbstractTask { private ProcessDao processDao; - public PythonTask(TaskProps taskProps, Logger logger,ProcessDao processDao) { + public PythonTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); this.taskDir = taskProps.getTaskDir(); @@ -73,7 +73,7 @@ public class PythonTask extends AbstractTask { taskProps.getTaskStartTime(), taskProps.getTaskTimeout(), logger); - this.processDao = processDao; + this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index ef6f7227e5..a7264d5977 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -61,7 +61,7 @@ public class ShellTask extends AbstractTask { private ProcessDao processDao; - public ShellTask(TaskProps taskProps, Logger logger,ProcessDao processDao) { + public ShellTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); this.taskDir = taskProps.getTaskDir(); @@ -74,7 +74,7 @@ public class ShellTask extends AbstractTask { taskProps.getTaskStartTime(), taskProps.getTaskTimeout(), logger); - this.processDao = processDao; + this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index 1fc63bef74..2ee42160fc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; @@ -48,8 +47,8 @@ public class SparkTask extends AbstractYarnTask { */ private SparkParameters sparkParameters; - public SparkTask(TaskProps props, Logger logger,ProcessDao processDao) { - super(props, logger,processDao); + public SparkTask(TaskProps props, Logger logger) { + super(props, logger); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index ceb2c9e26f..73eb0d1489 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -47,7 +47,6 @@ import org.apache.dolphinscheduler.server.utils.UDFUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.slf4j.Logger; -import org.springframework.beans.factory.annotation.Autowired; import java.sql.*; import java.util.*; @@ -75,7 +74,6 @@ public class SqlTask extends AbstractTask { /** * alert dao */ - @Autowired private AlertDao alertDao; /** @@ -89,7 +87,7 @@ public class SqlTask extends AbstractTask { private BaseDataSource baseDataSource; - public SqlTask(TaskProps taskProps, Logger logger,ProcessDao processDao) { + public SqlTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); logger.info("sql task params {}", taskProps.getTaskParams()); @@ -98,8 +96,8 @@ public class SqlTask extends AbstractTask { if (!sqlParameters.checkParameters()) { throw new RuntimeException("sql task params is not valid"); } - this.processDao = processDao; -// this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); + this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); + this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); } @Override diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java index f5df24e19d..a75033d77c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java @@ -79,7 +79,7 @@ public class ShellCommandExecutorTest { taskInstance.getId())); - AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger,null); + AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); logger.info("task info : {}", task); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java index d95d51b217..b97ee4d415 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.sql; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.dao.DaoFactory; @@ -55,7 +56,7 @@ public class SqlExecutorTest { String nodeName = "mysql sql test"; String taskAppId = "51_11282_263978"; String tenantCode = "hdfs"; - int taskInstId = 263978; + int taskInstId = 7; sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId); } @@ -105,7 +106,7 @@ public class SqlExecutorTest { taskProps.setTaskTimeout(360000); taskProps.setTaskInstId(taskInstId); taskProps.setNodeName(nodeName); - + taskProps.setCmdTypeIfComplement(CommandType.START_PROCESS); TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId); @@ -122,7 +123,7 @@ public class SqlExecutorTest { taskInstance.getId())); - AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger,null); + AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); logger.info("task info : {}", task); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java index 588f4379b7..3d428eab89 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java @@ -52,7 +52,7 @@ public class DependentTaskTest { taskProps.setTaskInstId(252612); taskProps.setDependence(dependString); - DependentTask dependentTask = new DependentTask(taskProps, logger,null); + DependentTask dependentTask = new DependentTask(taskProps, logger); dependentTask.init(); dependentTask.handle(); Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_FAILURE );