Browse Source

Fix shell tools for database schema cannot run (#7003)

3.0.0/version-upgrade
kezhenxu94 3 years ago committed by GitHub
parent
commit
ef0cb46661
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      docker/build/startup-init-conf.sh
  2. 119
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SchemaUtilsTest.java
  3. 15
      dolphinscheduler-dao/pom.xml
  4. 140
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java
  5. 23
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
  6. 90
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
  7. 36
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/MysqlUpgradeDao.java
  8. 66
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/PostgresqlUpgradeDao.java
  9. 70
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/SchemaUtils.java
  10. 199
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
  11. 60
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/CreateDolphinScheduler.java
  12. 49
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/InitDolphinScheduler.java
  13. 56
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/UpgradeDolphinScheduler.java
  14. 2
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.0_schema/postgresql/dolphinscheduler_ddl.sql
  15. 3
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.0_schema/postgresql/dolphinscheduler_ddl_post.sql
  16. 3
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/BaseDaoTest.java
  17. 39
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ConnectionFactoryTest.java
  18. 15
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDaoTest.java
  19. 60
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/SchemaUtilsTest.java
  20. 40
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java
  21. 10
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDaoTest.java
  22. 2
      dolphinscheduler-dist/src/main/assembly/dolphinscheduler-bin.xml
  23. 2
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/tenant/docker-compose.yaml
  24. 6
      script/create-dolphinscheduler.sh
  25. 12
      script/dolphinscheduler-daemon.sh
  26. 6
      script/upgrade-dolphinscheduler.sh

11
docker/build/startup-init-conf.sh

@ -26,13 +26,10 @@ echo "init env variables"
# Database
#============================================================================
export DATABASE_TYPE=${DATABASE_TYPE:-"postgresql"}
export DATABASE_DRIVER=${DATABASE_DRIVER:-"org.postgresql.Driver"}
export DATABASE_HOST=${DATABASE_HOST:-"127.0.0.1"}
export DATABASE_PORT=${DATABASE_PORT:-"5432"}
export DATABASE_USERNAME=${DATABASE_USERNAME:-"root"}
export DATABASE_PASSWORD=${DATABASE_PASSWORD:-"root"}
export DATABASE_DATABASE=${DATABASE_DATABASE:-"dolphinscheduler"}
export DATABASE_PARAMS=${DATABASE_PARAMS:-"characterEncoding=utf8"}
# export SPRING_DATASOURCE_URL=${SPRING_DATASOURCE_URL:-"jdbc:postgresql://localhost:5432/dolphinscheduler}
# export SPRING_DATASOURCE_USERNAME=${DATABASE_USERNAME:-"root"}
# export SPRING_DATASOURCE_PASSWORD=${DATABASE_PASSWORD:-"root"}
#============================================================================
# Registry

119
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SchemaUtilsTest.java

@ -1,119 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import static org.apache.commons.collections.CollectionUtils.isEqualCollection;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.LoggerFactory;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ LoggerFactory.class, FileUtils.class })
public class SchemaUtilsTest {
@Test
public void testReplaceBlank() {
Assert.assertEquals("abc", SchemaUtils.replaceBlank(" abc"));
Assert.assertEquals("abc", SchemaUtils.replaceBlank("abc "));
Assert.assertEquals("abc", SchemaUtils.replaceBlank("a b c"));
Assert.assertEquals("abc", SchemaUtils.replaceBlank("a b c"));
Assert.assertEquals("", SchemaUtils.replaceBlank(" "));
Assert.assertEquals("", SchemaUtils.replaceBlank(null));
Assert.assertEquals("我怕的你", SchemaUtils.replaceBlank("我怕的 你"));
}
@Test
public void testGetSoftVersion() {
// file not found
try {
SchemaUtils.getSoftVersion();
} catch (RuntimeException e) {
Assert.assertEquals("Failed to get the product version description file. The file could not be found",
e.getMessage());
}
// file exists, fmt is invalid
FileUtils.writeContent2File("32432423", "sql/soft_version");
Assert.assertEquals("32432423", SchemaUtils.getSoftVersion());
}
@Test
public void testIsAGreatVersion() {
// param is null
try {
SchemaUtils.isAGreatVersion(null, null);
} catch (RuntimeException e) {
Assert.assertEquals("schemaVersion or version is empty", e.getMessage());
}
// param is ""
try {
SchemaUtils.isAGreatVersion("", "");
} catch (RuntimeException e) {
Assert.assertEquals("schemaVersion or version is empty", e.getMessage());
}
Assert.assertFalse(SchemaUtils.isAGreatVersion("1", "1"));
Assert.assertTrue(SchemaUtils.isAGreatVersion("2", "1"));
Assert.assertTrue(SchemaUtils.isAGreatVersion("1.1", "1"));
Assert.assertTrue(SchemaUtils.isAGreatVersion("1.1", "1.0.1"));
Assert.assertFalse(SchemaUtils.isAGreatVersion("1.1", "1.2"));
Assert.assertTrue(SchemaUtils.isAGreatVersion("1.1.1", "1.1"));
Assert.assertTrue(SchemaUtils.isAGreatVersion("10.1.1", "1.01.100"));
try {
SchemaUtils.isAGreatVersion("10.1.1", ".1");
} catch (Exception e) {
Assert.assertNotNull(e);
}
try {
SchemaUtils.isAGreatVersion("a.1.1", "b.1");
} catch (Exception e) {
Assert.assertNotNull(e);
}
}
@Test
public void testGetAllSchemaList() {
//normal
PowerMockito.mockStatic(FileUtils.class);
File[] files = new File[4];
files[0] = new File("sql/upgrade/1.2.0_schema");
files[1] = new File("sql/upgrade/1.0.1_schema");
files[2] = new File("sql/upgrade/1.0.2_schema");
files[3] = new File("sql/upgrade/1.1.0_schema");
PowerMockito.when(FileUtils.getAllDir("sql/upgrade")).thenReturn(files);
List<String> real = SchemaUtils.getAllSchemaList();
List<String> expect = Arrays.asList("1.0.1_schema", "1.0.2_schema",
"1.1.0_schema", "1.2.0_schema");
boolean result = isEqualCollection(real, expect);
Assert.assertTrue(result);
//normal
files = new File[0];
PowerMockito.when(FileUtils.getAllDir("sql/upgrade")).thenReturn(files);
real = SchemaUtils.getAllSchemaList();
Assert.assertNull(real);
}
}

15
dolphinscheduler-dao/pom.xml

@ -115,4 +115,19 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<excludes>
<exclude>sql/</exclude>
<exclude>*.yaml</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

140
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java

@ -1,140 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.datasource;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.transaction.TransactionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import java.sql.SQLException;
import javax.sql.DataSource;
import org.mybatis.spring.SqlSessionTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
/**
* not spring manager connection, only use for init db, and alert module for non-spring application
* data source connection factory
*/
public class ConnectionFactory extends SpringConnectionFactory {
private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);
private static class ConnectionFactoryHolder {
private static final ConnectionFactory connectionFactory = new ConnectionFactory();
}
public static ConnectionFactory getInstance() {
return ConnectionFactoryHolder.connectionFactory;
}
private ConnectionFactory() {
try {
sqlSessionFactory = getSqlSessionFactory();
sqlSessionTemplate = getSqlSessionTemplate();
} catch (Exception e) {
logger.error("Initializing ConnectionFactory error", e);
throw new RuntimeException(e);
}
}
/**
* sql session factory
*/
private SqlSessionFactory sqlSessionFactory;
/**
* sql session template
*/
private SqlSessionTemplate sqlSessionTemplate;
private DataSource dataSource;
// TODO remove
public DataSource getDataSource() {
return dataSource;
}
/**
* * get sql session factory
*
* @return sqlSessionFactory
* @throws Exception sqlSessionFactory exception
*/
@Bean
public SqlSessionFactory getSqlSessionFactory() throws Exception {
TransactionFactory transactionFactory = new JdbcTransactionFactory();
Environment environment = new Environment("development", transactionFactory, getDataSource());
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.setEnvironment(environment);
configuration.setLazyLoadingEnabled(true);
configuration.addMappers("org.apache.dolphinscheduler.dao.mapper");
configuration.addInterceptor(new PaginationInterceptor());
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
sqlSessionFactoryBean.setDataSource(getDataSource());
sqlSessionFactoryBean.setTypeEnumsPackage("org.apache.dolphinscheduler.*.enums");
sqlSessionFactory = sqlSessionFactoryBean.getObject();
return sqlSessionFactory;
}
private SqlSessionTemplate getSqlSessionTemplate() {
sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory);
return sqlSessionTemplate;
}
/**
* get sql session
*
* @return sqlSession
*/
public SqlSession getSqlSession() {
return sqlSessionTemplate;
}
/**
* get mapper
*
* @param type target class
* @param <T> generic
* @return target object
*/
public <T> T getMapper(Class<T> type) {
try {
return getSqlSession().getMapper(type);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("get mapper failed");
}
}
}

23
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java

@ -42,32 +42,16 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
@Configuration
public class SpringConnectionFactory {
/**
* pagination interceptor
*
* @return pagination interceptor
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
return new PaginationInterceptor();
}
/**
* * get transaction manager
*
* @return DataSourceTransactionManager
*/
@Bean
public DataSourceTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
/**
* * get sql session factory
*
* @return sqlSessionFactory
* @throws Exception sqlSessionFactory exception
*/
@Bean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
MybatisConfiguration configuration = new MybatisConfiguration();
@ -76,6 +60,7 @@ public class SpringConnectionFactory {
configuration.setCallSettersOnNulls(true);
configuration.setJdbcTypeForNull(JdbcType.NULL);
configuration.addInterceptor(paginationInterceptor());
configuration.setGlobalConfig(new GlobalConfig().setBanner(false));
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
sqlSessionFactoryBean.setDataSource(dataSource);
@ -93,11 +78,6 @@ public class SpringConnectionFactory {
return sqlSessionFactoryBean.getObject();
}
/**
* get sql session
*
* @return SqlSession
*/
@Bean
public SqlSession sqlSession(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
@ -113,5 +93,4 @@ public class SpringConnectionFactory {
databaseIdProvider.setProperties(properties);
return databaseIdProvider;
}
}

90
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java

@ -17,102 +17,84 @@
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.io.IOException;
import java.sql.Connection;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
/**
* upgrade manager
*/
@Service
@Profile("shell-cli")
public class DolphinSchedulerManager {
private static final Logger logger = LoggerFactory.getLogger(DolphinSchedulerManager.class);
UpgradeDao upgradeDao;
/**
* init upgrade dao
*/
private void initUpgradeDao() {
DbType dbType = UpgradeDao.getDbType();
if (dbType != null) {
switch (dbType) {
case MYSQL:
upgradeDao = MysqlUpgradeDao.getInstance();
break;
case POSTGRESQL:
upgradeDao = PostgresqlUpgradeDao.getInstance();
break;
default:
logger.error("not support sql type: {},can't upgrade", dbType);
throw new IllegalArgumentException("not support sql type,can't upgrade");
}
}
private final UpgradeDao upgradeDao;
public DolphinSchedulerManager(DataSource dataSource, List<UpgradeDao> daos) throws Exception {
final DbType type = getCurrentDbType(dataSource);
upgradeDao = daos.stream()
.filter(it -> it.getDbType() == type)
.findFirst()
.orElseThrow(() -> new RuntimeException(
"Cannot find UpgradeDao implementation for db type: " + type
));
}
/**
* constructor init
*/
public DolphinSchedulerManager() {
initUpgradeDao();
private DbType getCurrentDbType(DataSource dataSource) throws Exception {
try (Connection conn = dataSource.getConnection()) {
String name = conn.getMetaData().getDatabaseProductName().toUpperCase();
return DbType.valueOf(name);
}
}
/**
* init DolphinScheduler
*/
public void initDolphinScheduler() {
// Determines whether the dolphinscheduler table structure has been init
if (upgradeDao.isExistsTable("t_escheduler_version") ||
upgradeDao.isExistsTable("t_ds_version") ||
upgradeDao.isExistsTable("t_escheduler_queue")) {
if (upgradeDao.isExistsTable("t_escheduler_version")
|| upgradeDao.isExistsTable("t_ds_version")
|| upgradeDao.isExistsTable("t_escheduler_queue")) {
logger.info("The database has been initialized. Skip the initialization step");
return;
}
this.initDolphinSchedulerSchema();
}
/**
* init DolphinScheduler Schema
*/
public void initDolphinSchedulerSchema() {
logger.info("Start initializing the DolphinScheduler manager table structure");
upgradeDao.initSchema();
}
/**
* upgrade DolphinScheduler
*/
public void upgradeDolphinScheduler() {
public void upgradeDolphinScheduler() throws IOException {
// Gets a list of all upgrades
List<String> schemaList = SchemaUtils.getAllSchemaList();
if(schemaList == null || schemaList.size() == 0) {
if (schemaList == null || schemaList.size() == 0) {
logger.info("There is no schema to upgrade!");
}else {
String version = "";
} else {
String version;
// Gets the version of the current system
if (upgradeDao.isExistsTable("t_escheduler_version")) {
version = upgradeDao.getCurrentVersion("t_escheduler_version");
}else if(upgradeDao.isExistsTable("t_ds_version")){
} else if (upgradeDao.isExistsTable("t_ds_version")) {
version = upgradeDao.getCurrentVersion("t_ds_version");
}else if(upgradeDao.isExistsColumn("t_escheduler_queue","create_time")){
} else if (upgradeDao.isExistsColumn("t_escheduler_queue", "create_time")) {
version = "1.0.1";
}else if(upgradeDao.isExistsTable("t_escheduler_queue")){
} else if (upgradeDao.isExistsTable("t_escheduler_queue")) {
version = "1.0.0";
}else{
} else {
logger.error("Unable to determine current software version, so cannot upgrade");
throw new RuntimeException("Unable to determine current software version, so cannot upgrade");
}
// The target version of the upgrade
String schemaVersion = "";
for(String schemaDir : schemaList) {
for (String schemaDir : schemaList) {
schemaVersion = schemaDir.split("_")[0];
if(SchemaUtils.isAGreatVersion(schemaVersion , version)) {
if (SchemaUtils.isAGreatVersion(schemaVersion, version)) {
logger.info("upgrade DolphinScheduler metadata version from {} to {}", version, schemaVersion);
logger.info("Begin upgrading DolphinScheduler's table structure");
upgradeDao.upgradeDolphinScheduler(schemaDir);

36
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/MysqlUpgradeDao.java

@ -17,38 +17,38 @@
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* mysql upgrade dao
*/
public class MysqlUpgradeDao extends UpgradeDao {
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
@Service
@Profile("shell-cli")
public class MysqlUpgradeDao extends UpgradeDao {
public static final Logger logger = LoggerFactory.getLogger(MysqlUpgradeDao.class);
/**
* mysql upgrade dao holder
*/
private static class MysqlUpgradeDaoHolder {
private static final MysqlUpgradeDao INSTANCE = new MysqlUpgradeDao();
private MysqlUpgradeDao(DataSource dataSource) {
super(dataSource);
}
/**
* mysql upgrade dao constructor
*/
private MysqlUpgradeDao() {
@Override
protected String initSqlPath() {
return "create/release-1.0.0_schema/mysql";
}
public static final MysqlUpgradeDao getInstance() {
return MysqlUpgradeDaoHolder.INSTANCE;
@Override
protected DbType getDbType() {
return DbType.MYSQL;
}
/**
* determines whether a table exists
* @param tableName tableName

66
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/PostgresqlUpgradeDao.java

@ -17,44 +17,40 @@
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* postgresql upgrade dao
*/
public class PostgresqlUpgradeDao extends UpgradeDao {
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
@Service
@Profile("shell-cli")
public class PostgresqlUpgradeDao extends UpgradeDao {
public static final Logger logger = LoggerFactory.getLogger(PostgresqlUpgradeDao.class);
private static final String SCHEMA = getSchema();
/**
* postgresql upgrade dao holder
*/
private static class PostgresqlUpgradeDaoHolder {
private static final PostgresqlUpgradeDao INSTANCE = new PostgresqlUpgradeDao();
private PostgresqlUpgradeDao(DataSource dataSource) {
super(dataSource);
}
/**
* PostgresqlUpgradeDao Constructor
*/
private PostgresqlUpgradeDao() {
@Override
protected String initSqlPath() {
return "create/release-1.2.0_schema/postgresql";
}
public static final PostgresqlUpgradeDao getInstance() {
return PostgresqlUpgradeDaoHolder.INSTANCE;
@Override
protected DbType getDbType() {
return DbType.POSTGRESQL;
}
/**
* getSchema
* @return schema
*/
public static String getSchema(){
public String getSchema() {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet resultSet = null;
@ -62,14 +58,14 @@ public class PostgresqlUpgradeDao extends UpgradeDao {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement("select current_schema()");
resultSet = pstmt.executeQuery();
while (resultSet.next()){
if(resultSet.isFirst()){
while (resultSet.next()) {
if (resultSet.isFirst()) {
return resultSet.getString(1);
}
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
logger.error(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(resultSet, pstmt, conn);
}
@ -79,6 +75,7 @@ public class PostgresqlUpgradeDao extends UpgradeDao {
/**
* determines whether a table exists
*
* @param tableName tableName
* @return if table exist return trueelse return false
*/
@ -89,12 +86,12 @@ public class PostgresqlUpgradeDao extends UpgradeDao {
try {
conn = dataSource.getConnection();
rs = conn.getMetaData().getTables(conn.getCatalog(), SCHEMA, tableName, null);
rs = conn.getMetaData().getTables(conn.getCatalog(), getSchema(), tableName, null);
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(rs, conn);
}
@ -103,21 +100,22 @@ public class PostgresqlUpgradeDao extends UpgradeDao {
/**
* determines whether a field exists in the specified table
*
* @param tableName tableName
* @param columnName columnName
* @return if column name exist return trueelse return false
* @return if column name exist return trueelse return false
*/
@Override
public boolean isExistsColumn(String tableName,String columnName) {
public boolean isExistsColumn(String tableName, String columnName) {
Connection conn = null;
ResultSet rs = null;
try {
conn = dataSource.getConnection();
rs = conn.getMetaData().getColumns(conn.getCatalog(), SCHEMA,tableName,columnName);
rs = conn.getMetaData().getColumns(conn.getCatalog(), getSchema(), tableName, columnName);
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(rs, conn);

70
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SchemaUtils.java → dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/SchemaUtils.java

@ -15,55 +15,48 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.commons.lang.StringUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import com.google.common.base.Strings;
/**
* Metadata related common classes
*/
public class SchemaUtils {
private static final Logger logger = LoggerFactory.getLogger(SchemaUtils.class);
private static final Pattern p = Pattern.compile("\\s*|\t|\r|\n");
private SchemaUtils() {
throw new UnsupportedOperationException("Construct SchemaUtils");
}
/**
* Gets upgradable schemas for all upgrade directories
*
* @return all schema list
*/
public static List<String> getAllSchemaList() {
List<String> schemaDirList = new ArrayList<>();
File[] schemaDirArr = FileUtils.getAllDir("sql/upgrade");
if (schemaDirArr == null || schemaDirArr.length == 0) {
return null;
}
public static List<String> getAllSchemaList() throws IOException {
final File[] schemaDirArr = new ClassPathResource("sql/upgrade").getFile().listFiles();
for (File file : schemaDirArr) {
schemaDirList.add(file.getName());
if (schemaDirArr == null || schemaDirArr.length == 0) {
return Collections.emptyList();
}
schemaDirList.sort((o1, o2) -> {
return Arrays.stream(schemaDirArr).map(File::getName).sorted((o1, o2) -> {
try {
String dir1 = String.valueOf(o1);
String dir2 = String.valueOf(o2);
String version1 = dir1.split("_")[0];
String version2 = dir2.split("_")[0];
String version1 = o1.split("_")[0];
String version2 = o2.split("_")[0];
if (version1.equals(version2)) {
return 0;
}
@ -73,14 +66,11 @@ public class SchemaUtils {
}
return -1;
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
});
return schemaDirList;
}).collect(Collectors.toList());
}
/**
@ -115,11 +105,12 @@ public class SchemaUtils {
*
* @return current software version
*/
public static String getSoftVersion() {
public static String getSoftVersion() throws IOException {
final ClassPathResource softVersionFile = new ClassPathResource("sql/soft_version");
String softVersion;
try {
softVersion = FileUtils.readFile2Str(new FileInputStream("sql/soft_version"));
softVersion = replaceBlank(softVersion);
softVersion = FileUtils.readFile2Str(softVersionFile.getInputStream());
softVersion = Strings.nullToEmpty(softVersion).replaceAll("\\s+|\r|\n", "");
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("Failed to get the product version description file. The file could not be found", e);
@ -127,19 +118,4 @@ public class SchemaUtils {
return softVersion;
}
/**
* Strips the string of space carriage returns and tabs
*
* @param str string
* @return string removed blank
*/
public static String replaceBlank(String str) {
String dest = "";
if (str != null) {
Matcher m = p.matcher(str);
dest = m.replaceAll("");
}
return dest;
}
}

199
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@ -28,9 +28,7 @@ import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@ -40,16 +38,14 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@ -62,6 +58,8 @@ import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
@ -70,102 +68,38 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public abstract class UpgradeDao {
public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class);
private static final String T_VERSION_NAME = "t_escheduler_version";
private static final String T_NEW_VERSION_NAME = "t_ds_version";
private static final String rootDir = System.getProperty("user.dir");
protected static final DataSource dataSource = getDataSource();
private static final DbType dbType = getCurrentDbType();
public static DataSource getDataSource(){
return ConnectionFactory.getInstance().getDataSource();
}
protected final DataSource dataSource;
/**
* get db type
* @return dbType
*/
public static DbType getDbType(){
return dbType;
protected UpgradeDao(DataSource dataSource) {
this.dataSource = dataSource;
}
/**
* get current dbType
* @return
*/
private static DbType getCurrentDbType(){
Connection conn = null;
try {
conn = dataSource.getConnection();
String name = conn.getMetaData().getDatabaseProductName().toUpperCase();
return DbType.valueOf(name);
} catch (Exception e) {
logger.error(e.getMessage(),e);
return null;
}finally {
ConnectionUtils.releaseResource(conn);
}
}
protected abstract String initSqlPath();
/**
* init schema
*/
public void initSchema() {
DbType dbType = getDbType();
String initSqlPath = "";
if (dbType != null) {
switch (dbType) {
case MYSQL:
initSqlPath = "/sql/create/release-1.0.0_schema/mysql/";
initSchema(initSqlPath);
break;
case POSTGRESQL:
initSqlPath = "/sql/create/release-1.2.0_schema/postgresql/";
initSchema(initSqlPath);
break;
default:
logger.error("not support sql type: {},can't upgrade", dbType);
throw new IllegalArgumentException("not support sql type,can't upgrade");
}
}
}
/**
* init scheam
*
* @param initSqlPath initSqlPath
*/
public void initSchema(String initSqlPath) {
protected abstract DbType getDbType();
public void initSchema() {
// Execute the dolphinscheduler DDL, it cannot be rolled back
runInitDDL(initSqlPath);
runInitDDL();
// Execute the dolphinscheduler DML, it can be rolled back
runInitDML(initSqlPath);
runInitDML();
}
/**
* run DML
*
* @param initSqlPath initSqlPath
*/
private void runInitDML(String initSqlPath) {
private void runInitDML() {
Resource mysqlSQLFilePath = new ClassPathResource("sql/" + initSqlPath() + "/dolphinscheduler_dml.sql");
Connection conn = null;
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
String mysqlSQLFilePath = rootDir + initSqlPath + "dolphinscheduler_dml.sql";
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
// Execute the dolphinscheduler_dml.sql script to import related data of dolphinscheduler
ScriptRunner initScriptRunner = new ScriptRunner(conn, false, true);
Reader initSqlReader = new FileReader(mysqlSQLFilePath);
Reader initSqlReader = new InputStreamReader(mysqlSQLFilePath.getInputStream());
initScriptRunner.runScript(initSqlReader);
conn.commit();
@ -189,68 +123,27 @@ public abstract class UpgradeDao {
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
/**
* run DDL
*
* @param initSqlPath initSqlPath
*/
private void runInitDDL(String initSqlPath) {
Connection conn = null;
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
String mysqlSQLFilePath = rootDir + initSqlPath + "dolphinscheduler_ddl.sql";
try {
conn = dataSource.getConnection();
private void runInitDDL() {
Resource mysqlSQLFilePath = new ClassPathResource("sql/" + initSqlPath() + "/dolphinscheduler_ddl.sql");
try (Connection conn = dataSource.getConnection()) {
// Execute the dolphinscheduler_ddl.sql script to create the table structure of dolphinscheduler
ScriptRunner initScriptRunner = new ScriptRunner(conn, true, true);
Reader initSqlReader = new FileReader(mysqlSQLFilePath);
Reader initSqlReader = new InputStreamReader(mysqlSQLFilePath.getInputStream());
initScriptRunner.runScript(initSqlReader);
} catch (IOException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
/**
* determines whether a table exists
*
* @param tableName tableName
* @return if table exist return trueelse return false
*/
public abstract boolean isExistsTable(String tableName);
/**
* determines whether a field exists in the specified table
*
* @param tableName tableName
* @param columnName columnName
* @return if column name exist return trueelse return false
*/
public abstract boolean isExistsColumn(String tableName, String columnName);
/**
* get current version
*
* @param versionName versionName
* @return version
*/
public String getCurrentVersion(String versionName) {
String sql = String.format("select version from %s", versionName);
Connection conn = null;
@ -276,18 +169,11 @@ public abstract class UpgradeDao {
}
}
/**
* upgrade DolphinScheduler
*
* @param schemaDir schema dir
*/
public void upgradeDolphinScheduler(String schemaDir) {
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql");
upgradeDolphinSchedulerDML(schemaDir);
}
/**
* upgrade DolphinScheduler worker group
* ds-1.3.0 modify the worker group for process definition json
@ -330,7 +216,7 @@ public abstract class UpgradeDao {
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId");
Integer workerGroupId = -1;
int workerGroupId = -1;
if (workerGroupNode != null && workerGroupNode.canConvertToInt()) {
workerGroupId = workerGroupNode.asInt(-1);
}
@ -355,9 +241,6 @@ public abstract class UpgradeDao {
}
}
/**
* updateProcessDefinitionJsonResourceList
*/
protected void updateProcessDefinitionJsonResourceList() {
ResourceDao resourceDao = new ResourceDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
@ -415,17 +298,9 @@ public abstract class UpgradeDao {
}
/**
* upgradeDolphinScheduler DML
*
* @param schemaDir schemaDir
*/
private void upgradeDolphinSchedulerDML(String schemaDir) {
String schemaVersion = schemaDir.split("_")[0];
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
String sqlFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/dolphinscheduler_dml.sql", rootDir, schemaDir, getDbType().name().toLowerCase());
Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/dolphinscheduler_dml.sql", schemaDir, getDbType().name().toLowerCase()));
logger.info("sqlSQLFilePath" + sqlFilePath);
Connection conn = null;
PreparedStatement pstmt = null;
@ -434,7 +309,7 @@ public abstract class UpgradeDao {
conn.setAutoCommit(false);
// Execute the upgraded dolphinscheduler dml
ScriptRunner scriptRunner = new ScriptRunner(conn, false, true);
Reader sqlReader = new FileReader(new File(sqlFilePath));
Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream());
scriptRunner.runScript(sqlReader);
if (isExistsTable(T_VERSION_NAME)) {
// Change version in the version table to the new version
@ -466,16 +341,6 @@ public abstract class UpgradeDao {
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (SQLException e) {
try {
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
try {
if (null != conn) {
@ -498,10 +363,7 @@ public abstract class UpgradeDao {
* @param schemaDir schemaDir
*/
private void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) {
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
String sqlFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/{3}", rootDir, schemaDir, getDbType().name().toLowerCase(), scriptFile);
Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile));
Connection conn = null;
PreparedStatement pstmt = null;
try {
@ -511,23 +373,14 @@ public abstract class UpgradeDao {
conn.setAutoCommit(true);
// Execute the dolphinscheduler ddl.sql for the upgrade
ScriptRunner scriptRunner = new ScriptRunner(conn, true, true);
Reader sqlReader = new FileReader(new File(sqlFilePath));
Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream());
scriptRunner.runScript(sqlReader);
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql file not found ", e);
} catch (IOException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
@ -612,7 +465,7 @@ public abstract class UpgradeDao {
List<TaskDefinitionLog> taskDefinitionLogs,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) throws Exception {
Map<Integer, ProcessDefinition> processDefinitionMap = processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition));
.collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition));
Date now = new Date();
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
if (entry.getValue() == null) {
@ -763,8 +616,8 @@ public abstract class UpgradeDao {
}
public void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs,
Map<Integer, Long> projectIdCodeMap,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) {
Map<Integer, Long> projectIdCodeMap,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) {
ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams());

60
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/CreateDolphinScheduler.java

@ -18,32 +18,46 @@
package org.apache.dolphinscheduler.dao.upgrade.shell;
import org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
/**
* create DolphinScheduler
*
*/
@ComponentScan(value = "org.apache.dolphinscheduler.dao")
@EnableAutoConfiguration(exclude = {QuartzAutoConfiguration.class})
public class CreateDolphinScheduler {
public static void main(String[] args) {
new SpringApplicationBuilder(CreateDolphinScheduler.class)
.profiles("shell-create", "shell-cli")
.web(WebApplicationType.NONE)
.run(args);
}
@Component
@Profile("shell-create")
static class CreateRunner implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(CreateRunner.class);
private final DolphinSchedulerManager dolphinSchedulerManager;
CreateRunner(DolphinSchedulerManager dolphinSchedulerManager) {
this.dolphinSchedulerManager = dolphinSchedulerManager;
}
private static final Logger logger = LoggerFactory.getLogger(CreateDolphinScheduler.class);
/**
* create dolphin scheduler db
* @param args args
*/
public static void main(String[] args) {
DolphinSchedulerManager dolphinSchedulerManager = new DolphinSchedulerManager();
try {
dolphinSchedulerManager.initDolphinScheduler();
logger.info("init DolphinScheduler finished");
dolphinSchedulerManager.upgradeDolphinScheduler();
logger.info("upgrade DolphinScheduler finished");
logger.info("create DolphinScheduler success");
} catch (Exception e) {
logger.error("create DolphinScheduler failed",e);
}
}
@Override
public void run(String... args) throws Exception {
dolphinSchedulerManager.initDolphinScheduler();
logger.info("init DolphinScheduler finished");
dolphinSchedulerManager.upgradeDolphinScheduler();
logger.info("upgrade DolphinScheduler finished");
logger.info("create DolphinScheduler success");
}
}
}

49
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/InitDolphinScheduler.java

@ -17,26 +17,43 @@
package org.apache.dolphinscheduler.dao.upgrade.shell;
import org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
/**
* init DolphinScheduler
*
*/
@ComponentScan(value = "org.apache.dolphinscheduler.dao")
@EnableAutoConfiguration(exclude = {QuartzAutoConfiguration.class})
public class InitDolphinScheduler {
public static void main(String[] args) {
new SpringApplicationBuilder(InitDolphinScheduler.class)
.profiles("shell-init", "shell-cli")
.web(WebApplicationType.NONE)
.run(args);
}
@Component
@Profile("shell-init")
static class InitRunner implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(InitRunner.class);
private final DolphinSchedulerManager dolphinSchedulerManager;
private static final Logger logger = LoggerFactory.getLogger(InitDolphinScheduler.class);
InitRunner(DolphinSchedulerManager dolphinSchedulerManager) {
this.dolphinSchedulerManager = dolphinSchedulerManager;
}
/**
* init dolphin scheduler db
* @param args args
*/
public static void main(String[] args) {
Thread.currentThread().setName("manager-InitDolphinScheduler");
DolphinSchedulerManager dolphinSchedulerManager = new DolphinSchedulerManager();
dolphinSchedulerManager.initDolphinScheduler();
logger.info("init DolphinScheduler finished");
}
@Override
public void run(String... args) throws Exception {
dolphinSchedulerManager.initDolphinScheduler();
logger.info("init DolphinScheduler finished");
}
}
}

56
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/UpgradeDolphinScheduler.java

@ -17,31 +17,43 @@
package org.apache.dolphinscheduler.dao.upgrade.shell;
import org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
/**
* upgrade DolphinScheduler
*/
@ComponentScan(value = "org.apache.dolphinscheduler.dao")
@EnableAutoConfiguration(exclude = {QuartzAutoConfiguration.class})
public class UpgradeDolphinScheduler {
private static final Logger logger = LoggerFactory.getLogger(UpgradeDolphinScheduler.class);
public static void main(String[] args) {
new SpringApplicationBuilder(UpgradeDolphinScheduler.class)
.profiles("shell-upgrade", "shell-cli")
.web(WebApplicationType.NONE)
.run(args);
}
@Component
@Profile("shell-upgrade")
static class UpgradeRunner implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(UpgradeRunner.class);
private final DolphinSchedulerManager dolphinSchedulerManager;
UpgradeRunner(DolphinSchedulerManager dolphinSchedulerManager) {
this.dolphinSchedulerManager = dolphinSchedulerManager;
}
/**
* upgrade dolphin scheduler db
* @param args args
*/
public static void main(String[] args) {
DolphinSchedulerManager dolphinSchedulerManager = new DolphinSchedulerManager();
try {
dolphinSchedulerManager.upgradeDolphinScheduler();
logger.info("upgrade DolphinScheduler success");
} catch (Exception e) {
logger.error(e.getMessage(),e);
logger.info("Upgrade DolphinScheduler failed");
throw new RuntimeException(e);
}
}
@Override
public void run(String... args) throws Exception {
dolphinSchedulerManager.upgradeDolphinScheduler();
logger.info("upgrade DolphinScheduler success");
}
}
}

2
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -15,6 +15,7 @@
* limitations under the License.
*/
delimiter d//
CREATE OR REPLACE FUNCTION public.dolphin_update_metadata(
)
RETURNS character varying
@ -316,3 +317,4 @@ BEGIN
END;
$BODY$;
d//

3
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.0_schema/postgresql/dolphinscheduler_ddl_post.sql

@ -16,7 +16,6 @@
*/
ALTER TABLE "t_ds_process_definition" DROP CONSTRAINT "t_ds_process_definition_pkey";
ALTER TABLE "t_ds_process_definition" ADD CONSTRAINT "t_ds_process_definition_pkey" PRIMARY KEY ("id","code");
ALTER TABLE "t_ds_process_definition" DROP CONSTRAINT "process_definition_unique";
DROP INDEX "process_definition_index";
ALTER TABLE "t_ds_process_definition" DROP "process_definition_json";
@ -24,4 +23,4 @@ ALTER TABLE "t_ds_process_definition" DROP "connects";
ALTER TABLE "t_ds_process_definition" DROP "receivers";
ALTER TABLE "t_ds_process_definition" DROP "receivers_cc";
ALTER TABLE "t_ds_process_definition" DROP "modify_by";
ALTER TABLE "t_ds_process_definition" DROP "resource_ids";
ALTER TABLE "t_ds_process_definition" DROP "resource_ids";

3
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/BaseDaoTest.java

@ -18,20 +18,17 @@
package org.apache.dolphinscheduler.dao;
import org.apache.dolphinscheduler.common.enums.ProfileType;
import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles(value = ProfileType.H2)
@ContextConfiguration(classes = SpringConnectionFactory.class)
@Transactional
@Rollback
public abstract class BaseDaoTest {

39
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ConnectionFactoryTest.java

@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import java.sql.Connection;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class ConnectionFactoryTest {
@BeforeClass
public static void setup() {
System.setProperty("spring.profiles.active", "h2");
}
@Test
public void testConnection() throws Exception {
Connection connection = ConnectionFactory.getInstance().getDataSource().getConnection();
Assert.assertTrue(connection != null);
}
}

15
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDaoTest.java

@ -17,26 +17,21 @@
package org.apache.dolphinscheduler.dao.upgrade;
import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ActiveProfiles;
@ActiveProfiles("h2")
public class ProcessDefinitionDaoTest {
static DataSource dataSource;
@Autowired
private DataSource dataSource;
final ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
@BeforeClass
public static void seuUp() {
System.setProperty("spring.profiles.active", "h2");
dataSource = getDataSource();
}
@Test
public void testQueryAllProcessDefinition() {
//Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());

60
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/SchemaUtilsTest.java

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.junit.Assert;
import org.junit.Test;
public class SchemaUtilsTest {
@Test
public void testIsAGreatVersion() {
// param is null
try {
SchemaUtils.isAGreatVersion(null, null);
} catch (RuntimeException e) {
Assert.assertEquals("schemaVersion or version is empty", e.getMessage());
}
// param is ""
try {
SchemaUtils.isAGreatVersion("", "");
} catch (RuntimeException e) {
Assert.assertEquals("schemaVersion or version is empty", e.getMessage());
}
Assert.assertFalse(SchemaUtils.isAGreatVersion("1", "1"));
Assert.assertTrue(SchemaUtils.isAGreatVersion("2", "1"));
Assert.assertTrue(SchemaUtils.isAGreatVersion("1.1", "1"));
Assert.assertTrue(SchemaUtils.isAGreatVersion("1.1", "1.0.1"));
Assert.assertFalse(SchemaUtils.isAGreatVersion("1.1", "1.2"));
Assert.assertTrue(SchemaUtils.isAGreatVersion("1.1.1", "1.1"));
Assert.assertTrue(SchemaUtils.isAGreatVersion("10.1.1", "1.01.100"));
try {
SchemaUtils.isAGreatVersion("10.1.1", ".1");
Assert.fail("Should fail");
} catch (Exception ignored) {
// This is expected
}
try {
SchemaUtils.isAGreatVersion("a.1.1", "b.1");
Assert.fail("Should fail");
} catch (Exception ignored) {
// This is expected
}
}
}

40
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.junit.BeforeClass;
import org.junit.Test;
public class UpgradeDaoTest {
private PostgresqlUpgradeDao postgresqlUpgradeDao;
@BeforeClass
public static void setUpClass() {
System.setProperty("spring.profiles.active", "h2");
}
@Test
public void setUp() {
postgresqlUpgradeDao = PostgresqlUpgradeDao.getInstance();
}
@Test
public void testQueryQueryAllOldWorkerGroup() throws Exception{
//postgresqlUpgradeDao.updateProcessDefinitionJsonWorkerGroup();
}
}

10
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDaoTest.java

@ -16,8 +16,6 @@
*/
package org.apache.dolphinscheduler.dao.upgrade;
import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
@ -25,11 +23,12 @@ import java.util.Map;
import javax.sql.DataSource;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
public class WorkerGroupDaoTest {
@Autowired
protected DataSource dataSource;
@BeforeClass
@ -37,11 +36,6 @@ public class WorkerGroupDaoTest {
System.setProperty("spring.profiles.active", "h2");
}
@Before
public void setup() {
dataSource = getDataSource();
}
@Test
public void testQueryQueryAllOldWorkerGroup() throws Exception {
WorkerGroupDao workerGroupDao = new WorkerGroupDao();

2
dolphinscheduler-dist/src/main/assembly/dolphinscheduler-bin.xml vendored

@ -147,7 +147,7 @@
<includes>
<include>**/*</include>
</includes>
<outputDirectory>./sql</outputDirectory>
<outputDirectory>./sql/sql</outputDirectory> <!-- make sure the hierarchy is the same as in src/resources -->
</fileSet>
<fileSet>

2
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/tenant/docker-compose.yaml

@ -21,6 +21,8 @@ services:
dolphinscheduler:
image: apache/dolphinscheduler:ci
command: [ standalone-server ]
environment:
DATABASE_TYPE: h2
expose:
- 12345
networks:

6
script/create-dolphinscheduler.sh

@ -22,16 +22,20 @@ DOLPHINSCHEDULER_HOME=$BIN_DIR/..
export JAVA_HOME=$JAVA_HOME
export DATABASE_TYPE=${DATABASE_TYPE:-"h2"}
export SPRING_PROFILES_ACTIVE=${SPRING_PROFILES_ACTIVE:-"default"}
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},${DATABASE_TYPE}"
export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf
export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/*
export DOLPHINSCHEDULER_SQL_DIR=$DOLPHINSCHEDULER_HOME/sql
export DOLPHINSCHEDULER_OPTS="-server -Xms64m -Xmx64m -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=64m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=org.apache.dolphinscheduler.dao.upgrade.shell.CreateDolphinScheduler
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS"
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_SQL_DIR:$DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS"
cd $DOLPHINSCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command

12
script/dolphinscheduler-daemon.sh

@ -59,7 +59,7 @@ cd $DOLPHINSCHEDULER_HOME
export DOLPHINSCHEDULER_OPTS="-server -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xss512k -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -Xloggc:$DOLPHINSCHEDULER_LOG_DIR/gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof -XshowSettings:vm $DOLPHINSCHEDULER_OPTS"
export dbtype=${dbtype:-"h2"}
export DATABASE_TYPE=${DATABASE_TYPE:-"h2"}
export SPRING_PROFILES_ACTIVE=${SPRING_PROFILES_ACTIVE:-"default"}
if [ "$command" = "api-server" ]; then
@ -67,32 +67,32 @@ if [ "$command" = "api-server" ]; then
CLASS=org.apache.dolphinscheduler.api.ApiApplicationServer
HEAP_OPTS="-Xms1g -Xmx1g -Xmn512m"
export DOLPHINSCHEDULER_OPTS="$HEAP_OPTS $DOLPHINSCHEDULER_OPTS $API_SERVER_OPTS"
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},api,${dbtype}"
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},api,${DATABASE_TYPE}"
elif [ "$command" = "master-server" ]; then
LOG_FILE="-Dlogging.config=classpath:logback-master.xml"
CLASS=org.apache.dolphinscheduler.server.master.MasterServer
HEAP_OPTS="-Xms4g -Xmx4g -Xmn2g"
export DOLPHINSCHEDULER_OPTS="$HEAP_OPTS $DOLPHINSCHEDULER_OPTS $MASTER_SERVER_OPTS"
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},master,${dbtype}"
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},master,${DATABASE_TYPE}"
elif [ "$command" = "worker-server" ]; then
LOG_FILE="-Dlogging.config=classpath:logback-worker.xml"
CLASS=org.apache.dolphinscheduler.server.worker.WorkerServer
HEAP_OPTS="-Xms2g -Xmx2g -Xmn1g"
export DOLPHINSCHEDULER_OPTS="$HEAP_OPTS $DOLPHINSCHEDULER_OPTS $WORKER_SERVER_OPTS"
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},worker,${dbtype}"
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},worker,${DATABASE_TYPE}"
elif [ "$command" = "alert-server" ]; then
LOG_FILE="-Dlogback.configurationFile=conf/logback-alert.xml"
CLASS=org.apache.dolphinscheduler.alert.AlertServer
HEAP_OPTS="-Xms1g -Xmx1g -Xmn512m"
export DOLPHINSCHEDULER_OPTS="$HEAP_OPTS $DOLPHINSCHEDULER_OPTS $ALERT_SERVER_OPTS"
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},alert,${dbtype}"
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},alert,${DATABASE_TYPE}"
elif [ "$command" = "logger-server" ]; then
CLASS=org.apache.dolphinscheduler.server.log.LoggerServer
HEAP_OPTS="-Xms1g -Xmx1g -Xmn512m"
export DOLPHINSCHEDULER_OPTS="$HEAP_OPTS $DOLPHINSCHEDULER_OPTS $LOGGER_SERVER_OPTS"
elif [ "$command" = "standalone-server" ]; then
CLASS=org.apache.dolphinscheduler.server.StandaloneServer
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},standalone,${dbtype}"
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},standalone,${DATABASE_TYPE}"
else
echo "Error: No command named '$command' was found."
exit 1

6
script/upgrade-dolphinscheduler.sh

@ -22,16 +22,20 @@ DOLPHINSCHEDULER_HOME=$BIN_DIR/..
export JAVA_HOME=$JAVA_HOME
export DATABASE_TYPE=${DATABASE_TYPE:-"h2"}
export SPRING_PROFILES_ACTIVE=${SPRING_PROFILES_ACTIVE:-"default"}
export SPRING_PROFILES_ACTIVE="${SPRING_PROFILES_ACTIVE},${DATABASE_TYPE}"
export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf
export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/*
export DOLPHINSCHEDULER_SQL_DIR=$DOLPHINSCHEDULER_HOME/sql
export DOLPHINSCHEDULER_OPTS="-server -Xms64m -Xmx64m -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=64m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=org.apache.dolphinscheduler.dao.upgrade.shell.UpgradeDolphinScheduler
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS"
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_SQL_DIR:$DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS"
cd $DOLPHINSCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command

Loading…
Cancel
Save