Browse Source

fix the spring transaction not worker bug (#1252)

* move updateTaskState into try/catch block in case of exception

* fix NPE

* using conf.getInt instead of getString

* for AbstractZKClient, remove the log, for it will print the same log message in createZNodePath.
for AlertDao, correct the spelling.

* duplicate

* refactor getTaskWorkerGroupId

* add friendly log

* update hearbeat thread num = 1

* fix the bug when worker execute task using queue. and remove checking Tenant user anymore in TaskScheduleThread

* 1. move verifyTaskInstanceIsNull after taskInstance
2. keep verifyTenantIsNull/verifyTaskInstanceIsNull clean and readable

* fix the message

* delete before check to avoid KeeperException$NoNodeException

* fix the message

* check processInstance state before delete tenant

* check processInstance state before delete worker group

* refactor

* merge api constants into common constatns

* update the resource perm

* update the dataSource perm

* fix CheckUtils.checkUserParams method

* update AlertGroupService, extends from BaseService, remove duplicate methods

* refactor

* modify method name

* add hasProjectAndPerm method

* using checkProject instead of getResultStatus

* delete checkAuth method, using hasProjectAndPerm instead.

* correct spelling

* add transactional for deleteWorkerGroupById

* add Transactional for deleteProcessInstanceById method

* change sqlSessionTemplate singleton

* change sqlSessionTemplate singleton and reformat code

* fix unsuitable error message

* update shutdownhook methods

* fix worker log bug

* fix api server debug mode bug

* upgrade zk version

* delete this line ,for zkClient.close() will do the whole thing

* fix master server shutdown error

* degrade zk version and add FourLetterWordMain class

* fix PathChildrenCache not close

* add Transactional for createSession method

* add more message for java-doc

* delete App, let spring manage connectionFactory

* add license

* add class Application for test support

* refactor masterServer and workerServer

* add args

* fix the spring transaction not work bug

* remove author

* delete @Bean annotation

* rename application.properties to application-dao.properties
pull/2/head
Tboy 5 years ago committed by qiaozhanwei
parent
commit
f13389de52
  1. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  3. 33
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java
  4. 142
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
  5. 0
      dolphinscheduler-dao/src/main/resources/application-dao.properties
  6. 3
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/Application.java
  7. 16
      install.sh

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -471,7 +471,7 @@ public final class Constants {
/**
* task record configuration path
*/
public static final String APPLICATION_PROPERTIES = "application.properties";
public static final String APPLICATION_PROPERTIES = "application-dao.properties";
public static final String TASK_RECORD_URL = "task.record.datasource.url";

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -50,8 +50,8 @@ public class AlertDao extends AbstractBaseDao {
@Override
protected void init() {
alertMapper = ConnectionFactory.getSqlSession().getMapper(AlertMapper.class);
userAlertGroupMapper = ConnectionFactory.getSqlSession().getMapper(UserAlertGroupMapper.class);
alertMapper = ConnectionFactory.getMapper(AlertMapper.class);
userAlertGroupMapper = ConnectionFactory.getMapper(UserAlertGroupMapper.class);
}
/**

33
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java

@ -18,11 +18,9 @@ package org.apache.dolphinscheduler.dao.datasource;
import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.config.MybatisPlusConfig;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
@ -32,18 +30,19 @@ import org.mybatis.spring.SqlSessionTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
/**
* not spring manager connection, only use for init db, and alert module for non-spring application
* data source connection factory
*/
@Service
public class ConnectionFactory {
public class ConnectionFactory extends SpringConnectionFactory{
private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);
/**
* sql session factory
*/
@ -54,25 +53,10 @@ public class ConnectionFactory {
*/
private static SqlSessionTemplate sqlSessionTemplate;
/**
* Load configuration file
*/
private static org.apache.commons.configuration.Configuration conf;
static {
try {
conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
} catch (ConfigurationException e) {
logger.error("load configuration exception", e);
System.exit(1);
}
}
/**
* get the data source
* @return druid dataSource
*/
@Bean
public static DruidDataSource getDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
@ -105,11 +89,10 @@ public class ConnectionFactory {
}
/**
* get sql session factory
* * get sql session factory
* @return sqlSessionFactory
* @throws Exception sqlSessionFactory exception
*/
@Bean
public static SqlSessionFactory getSqlSessionFactory() throws Exception {
if (sqlSessionFactory == null) {
synchronized (ConnectionFactory.class) {
@ -123,7 +106,7 @@ public class ConnectionFactory {
configuration.setEnvironment(environment);
configuration.setLazyLoadingEnabled(true);
configuration.addMappers("org.apache.dolphinscheduler.dao.mapper");
configuration.addInterceptor(MybatisPlusConfig.paginationInterceptor());
configuration.addInterceptor(new PaginationInterceptor());
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
@ -143,7 +126,6 @@ public class ConnectionFactory {
* get sql session
* @return sqlSession
*/
@Bean
public static SqlSession getSqlSession() {
if (sqlSessionTemplate == null) {
synchronized (ConnectionFactory.class) {
@ -175,4 +157,5 @@ public class ConnectionFactory {
throw new RuntimeException("get mapper failed");
}
}
}

142
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.datasource;
import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
/**
* data source connection factory
*/
@Configuration
@MapperScan("org.apache.dolphinscheduler.*.mapper")
public class SpringConnectionFactory {
private static final Logger logger = LoggerFactory.getLogger(SpringConnectionFactory.class);
/**
* Load configuration file
*/
protected static org.apache.commons.configuration.Configuration conf;
static {
try {
conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
} catch (ConfigurationException e) {
logger.error("load configuration exception", e);
System.exit(1);
}
}
/**
* pagination interceptor
* @return pagination interceptor
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
return new PaginationInterceptor();
}
/**
* get the data source
* @return druid dataSource
*/
@Bean
public DruidDataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setDriverClassName(conf.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME));
druidDataSource.setUrl(conf.getString(Constants.SPRING_DATASOURCE_URL));
druidDataSource.setUsername(conf.getString(Constants.SPRING_DATASOURCE_USERNAME));
druidDataSource.setPassword(conf.getString(Constants.SPRING_DATASOURCE_PASSWORD));
druidDataSource.setValidationQuery(conf.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY));
druidDataSource.setPoolPreparedStatements(conf.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS));
druidDataSource.setTestWhileIdle(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE));
druidDataSource.setTestOnBorrow(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW));
druidDataSource.setTestOnReturn(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN));
druidDataSource.setKeepAlive(conf.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE));
druidDataSource.setMinIdle(conf.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE));
druidDataSource.setMaxActive(conf.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE));
druidDataSource.setMaxWait(conf.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT));
druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(conf.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE));
druidDataSource.setInitialSize(conf.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE));
druidDataSource.setTimeBetweenEvictionRunsMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS));
druidDataSource.setTimeBetweenConnectErrorMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS));
druidDataSource.setMinEvictableIdleTimeMillis(conf.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS));
druidDataSource.setValidationQueryTimeout(conf.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT));
//auto commit
druidDataSource.setDefaultAutoCommit(conf.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT));
return druidDataSource;
}
/**
* * get transaction manager
* @return DataSourceTransactionManager
*/
@Bean
public DataSourceTransactionManager transactionManager() {
return new DataSourceTransactionManager(dataSource());
}
/**
* * get sql session factory
* @return sqlSessionFactory
* @throws Exception sqlSessionFactory exception
*/
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.addMappers("org.apache.dolphinscheduler.dao.mapper");
configuration.addInterceptor(paginationInterceptor());
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
sqlSessionFactoryBean.setDataSource(dataSource());
sqlSessionFactoryBean.setTypeEnumsPackage("org.apache.dolphinscheduler.*.enums");
return sqlSessionFactoryBean.getObject();
}
/**
* get sql session
* @return sqlSession
*/
@Bean
public SqlSession sqlSession() throws Exception{
return new SqlSessionTemplate(sqlSessionFactory());
}
}

0
dolphinscheduler-dao/src/main/resources/application.properties → dolphinscheduler-dao/src/main/resources/application-dao.properties

3
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/Application.java

@ -20,9 +20,6 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
/**
* @Author: Tboy
*/
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler.dao")
public class Application {

16
install.sh

@ -301,10 +301,10 @@ apiMaxHttpPostSize="5000000"
# 1,replace file
echo "1,replace file"
if [ $dbtype == "mysql" ];then
sed -i ${txt} "s#spring.datasource.url.*#spring.datasource.url=jdbc:mysql://${dbhost}/${dbname}?characterEncoding=UTF-8#g" conf/application.properties
sed -i ${txt} "s#spring.datasource.username.*#spring.datasource.username=${username}#g" conf/application.properties
sed -i ${txt} "s#spring.datasource.password.*#spring.datasource.password=${passowrd}#g" conf/application.properties
sed -i ${txt} "s#spring.datasource.driver-class-name.*#spring.datasource.driver-class-name=com.mysql.jdbc.Driver#g" conf/application.properties
sed -i ${txt} "s#spring.datasource.url.*#spring.datasource.url=jdbc:mysql://${dbhost}/${dbname}?characterEncoding=UTF-8#g" conf/application-dao.properties
sed -i ${txt} "s#spring.datasource.username.*#spring.datasource.username=${username}#g" conf/application-dao.properties
sed -i ${txt} "s#spring.datasource.password.*#spring.datasource.password=${passowrd}#g" conf/application-dao.properties
sed -i ${txt} "s#spring.datasource.driver-class-name.*#spring.datasource.driver-class-name=com.mysql.jdbc.Driver#g" conf/application-dao.properties
sed -i ${txt} "s#org.quartz.dataSource.myDs.URL.*#org.quartz.dataSource.myDs.URL=jdbc:mysql://${dbhost}/${dbname}?characterEncoding=UTF-8#g" conf/quartz.properties
@ -314,10 +314,10 @@ if [ $dbtype == "mysql" ];then
fi
if [ $dbtype == "postgresql" ];then
sed -i ${txt} "s#spring.datasource.url.*#spring.datasource.url=jdbc:postgresql://${dbhost}/${dbname}?characterEncoding=UTF-8#g" conf/application.properties
sed -i ${txt} "s#spring.datasource.username.*#spring.datasource.username=${username}#g" conf/application.properties
sed -i ${txt} "s#spring.datasource.password.*#spring.datasource.password=${passowrd}#g" conf/application.properties
sed -i ${txt} "s#spring.datasource.driver-class-name.*#spring.datasource.driver-class-name=org.postgresql.Driver#g" conf/application.properties
sed -i ${txt} "s#spring.datasource.url.*#spring.datasource.url=jdbc:postgresql://${dbhost}/${dbname}?characterEncoding=UTF-8#g" conf/application-dao.properties
sed -i ${txt} "s#spring.datasource.username.*#spring.datasource.username=${username}#g" conf/application-dao.properties
sed -i ${txt} "s#spring.datasource.password.*#spring.datasource.password=${passowrd}#g" conf/application-dao.properties
sed -i ${txt} "s#spring.datasource.driver-class-name.*#spring.datasource.driver-class-name=org.postgresql.Driver#g" conf/application-dao.properties
sed -i ${txt} "s#org.quartz.dataSource.myDs.URL.*#org.quartz.dataSource.myDs.URL=jdbc:postgresql://${dbhost}/${dbname}?characterEncoding=UTF-8#g" conf/quartz.properties
sed -i ${txt} "s#org.quartz.dataSource.myDs.user.*#org.quartz.dataSource.myDs.user=${username}#g" conf/quartz.properties

Loading…
Cancel
Save