Browse Source

[Improvement][DAO] CreateDolphinScheduler (#5357) (#5358)

* [Improvement][DAO] Create CreateDolphinScheduler (#5357)

* add ut

* fix code smell
pull/3/MERGE
ruanwenjun 4 years ago committed by GitHub
parent
commit
b108ac43b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      dolphinscheduler-dao/pom.xml
  2. 11
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
  3. 137
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
  4. 34
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/CreateDolphinScheduler.java
  5. 21
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/InitDolphinScheduler.java
  6. 69
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java
  7. 40
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/shell/CreateDolphinSchedulerTest.java
  8. 40
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/shell/InitDolphinSchedulerTest.java
  9. 2
      pom.xml

17
dolphinscheduler-dao/pom.xml

@ -151,5 +151,22 @@
<groupId>org.yaml</groupId> <groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId> <artifactId>snakeyaml</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies> </dependencies>
</project> </project>

11
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java

@ -14,14 +14,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.upgrade; package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.SchemaUtils; import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.List;
/** /**
* upgrade manager * upgrade manager
@ -60,7 +65,7 @@ public class DolphinSchedulerManager {
/** /**
* init DolphinScheduler * init DolphinScheduler
*/ */
public void initDolphinScheduler() { public void initDolphinScheduler() throws SQLException, IOException {
// Determines whether the dolphinscheduler table structure has been init // Determines whether the dolphinscheduler table structure has been init
if (upgradeDao.isExistsTable("t_escheduler_version") || if (upgradeDao.isExistsTable("t_escheduler_version") ||
upgradeDao.isExistsTable("t_ds_version") || upgradeDao.isExistsTable("t_ds_version") ||
@ -74,7 +79,7 @@ public class DolphinSchedulerManager {
/** /**
* init DolphinScheduler Schema * init DolphinScheduler Schema
*/ */
public void initDolphinSchedulerSchema() { public void initDolphinSchedulerSchema() throws SQLException, IOException {
logger.info("Start initializing the DolphinScheduler manager table structure"); logger.info("Start initializing the DolphinScheduler manager table structure");
upgradeDao.initSchema(); upgradeDao.initSchema();

137
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@ -14,20 +14,25 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.upgrade; package org.apache.dolphinscheduler.dao.upgrade;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AbstractBaseDao; import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory; import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource; import java.io.File;
import java.io.*; import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
@ -38,6 +43,14 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public abstract class UpgradeDao extends AbstractBaseDao { public abstract class UpgradeDao extends AbstractBaseDao {
public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class);
@ -47,6 +60,9 @@ public abstract class UpgradeDao extends AbstractBaseDao {
protected static final DataSource dataSource = getDataSource(); protected static final DataSource dataSource = getDataSource();
private static final DbType dbType = getCurrentDbType(); private static final DbType dbType = getCurrentDbType();
private static final String MYSQL_CREATE_SCRIPT = rootDir + "/sql/dolphinscheduler_mysql.sql";
private static final String POSTGRE_CREATE_SCRIPT = rootDir + "/sql/dolphinscheduler_postgre.sql";
@Override @Override
protected void init() { protected void init() {
@ -90,119 +106,40 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/** /**
* init schema * init schema
*/ */
public void initSchema() { public void initSchema() throws SQLException, IOException {
DbType dbType = getDbType(); DbType dbType = getDbType();
String initSqlPath = ""; String initSqlPath = "";
if (dbType != null) { if (dbType != null) {
switch (dbType) { switch (dbType) {
case MYSQL: case MYSQL:
initSqlPath = "/sql/create/release-1.0.0_schema/mysql/"; initSqlPath = MYSQL_CREATE_SCRIPT;
initSchema(initSqlPath);
break; break;
case POSTGRESQL: case POSTGRESQL:
initSqlPath = "/sql/create/release-1.2.0_schema/postgresql/"; initSqlPath = POSTGRE_CREATE_SCRIPT;
initSchema(initSqlPath);
break; break;
default: default:
logger.error("not support sql type: {},can't upgrade", dbType); logger.error("not support sql type: {},can't upgrade", dbType);
throw new IllegalArgumentException("not support sql type,can't upgrade"); throw new IllegalArgumentException("not support sql type,can't upgrade");
} }
} }
}
/**
* init scheam
*
* @param initSqlPath initSqlPath
*/
public void initSchema(String initSqlPath) {
// Execute the dolphinscheduler DDL, it cannot be rolled back
runInitDDL(initSqlPath);
// Execute the dolphinscheduler DML, it can be rolled back
runInitDML(initSqlPath);
}
/**
* run DML
*
* @param initSqlPath initSqlPath
*/
private void runInitDML(String initSqlPath) {
Connection conn = null;
if (StringUtils.isEmpty(rootDir)) { if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found"); throw new RuntimeException("Environment variable user.dir not found");
} }
String mysqlSQLFilePath = rootDir + initSqlPath + "dolphinscheduler_dml.sql"; logger.info("Init sql filePath: {}", initSqlPath);
try { try (Connection conn = dataSource.getConnection()) {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
// Execute the dolphinscheduler_dml.sql script to import related data of dolphinscheduler
ScriptRunner initScriptRunner = new ScriptRunner(conn, false, true);
Reader initSqlReader = new FileReader(new File(mysqlSQLFilePath));
initScriptRunner.runScript(initSqlReader);
conn.commit();
} catch (IOException e) {
try { try {
conn.setAutoCommit(false);
ScriptRunner initScriptRunner = new ScriptRunner(conn, false, true);
Reader initSqlReader = new FileReader(initSqlPath);
initScriptRunner.runScript(initSqlReader);
conn.commit();
} catch (IOException | SQLException e) {
conn.rollback(); conn.rollback();
} catch (SQLException e1) { logger.error("execute init script error.", e);
logger.error(e1.getMessage(), e1); throw e;
} }
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
try {
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally { } finally {
ConnectionUtils.releaseResource(conn); // ignore
}
}
/**
* run DDL
*
* @param initSqlPath initSqlPath
*/
private void runInitDDL(String initSqlPath) {
Connection conn = null;
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
//String mysqlSQLFilePath = rootDir + "/sql/create/release-1.0.0_schema/mysql/dolphinscheduler_ddl.sql";
String mysqlSQLFilePath = rootDir + initSqlPath + "dolphinscheduler_ddl.sql";
try {
conn = dataSource.getConnection();
// Execute the dolphinscheduler_ddl.sql script to create the table structure of dolphinscheduler
ScriptRunner initScriptRunner = new ScriptRunner(conn, true, true);
Reader initSqlReader = new FileReader(new File(mysqlSQLFilePath));
initScriptRunner.runScript(initSqlReader);
} catch (IOException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(conn);
} }
} }

34
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/CreateDolphinScheduler.java

@ -14,35 +14,35 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.upgrade.shell; package org.apache.dolphinscheduler.dao.upgrade.shell;
import org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager; import org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* create DolphinScheduler * create DolphinScheduler
*
*/ */
public class CreateDolphinScheduler { public class CreateDolphinScheduler {
private static final Logger logger = LoggerFactory.getLogger(CreateDolphinScheduler.class); private static final Logger logger = LoggerFactory.getLogger(CreateDolphinScheduler.class);
/** /**
* create dolphin scheduler db * create dolphin scheduler db
* @param args args *
*/ * @param args args
public static void main(String[] args) { */
DolphinSchedulerManager dolphinSchedulerManager = new DolphinSchedulerManager(); public static void main(String[] args) {
try { DolphinSchedulerManager dolphinSchedulerManager = new DolphinSchedulerManager();
dolphinSchedulerManager.initDolphinScheduler(); try {
logger.info("init DolphinScheduler finished"); logger.info("create DolphinScheduler begin");
dolphinSchedulerManager.upgradeDolphinScheduler(); dolphinSchedulerManager.initDolphinScheduler();
logger.info("upgrade DolphinScheduler finished"); logger.info("create DolphinScheduler success");
logger.info("create DolphinScheduler success"); } catch (Exception e) {
} catch (Exception e) { logger.error("create DolphinScheduler failed", e);
logger.error("create DolphinScheduler failed",e); }
}
} }
} }

21
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/InitDolphinScheduler.java

@ -14,29 +14,34 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.upgrade.shell; package org.apache.dolphinscheduler.dao.upgrade.shell;
import org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager; import org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* init DolphinScheduler * init DolphinScheduler
*
*/ */
public class InitDolphinScheduler { public class InitDolphinScheduler {
private static final Logger logger = LoggerFactory.getLogger(InitDolphinScheduler.class); private static final Logger logger = LoggerFactory.getLogger(InitDolphinScheduler.class);
/** /**
* init dolphin scheduler db * init dolphin scheduler db
* @param args args * @param args args
*/ */
public static void main(String[] args) { public static void main(String[] args) {
Thread.currentThread().setName("manager-InitDolphinScheduler"); Thread.currentThread().setName("manager-InitDolphinScheduler");
DolphinSchedulerManager dolphinSchedulerManager = new DolphinSchedulerManager(); DolphinSchedulerManager dolphinSchedulerManager = new DolphinSchedulerManager();
dolphinSchedulerManager.initDolphinScheduler(); try {
logger.info("init DolphinScheduler finished"); dolphinSchedulerManager.initDolphinScheduler();
logger.info("init DolphinScheduler finished");
} catch (Exception ex) {
logger.error("init DolphinScheduler error", ex);
}
} }
} }

69
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java

@ -14,23 +14,76 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.upgrade; package org.apache.dolphinscheduler.dao.upgrade;
import org.junit.Test; import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.util.Map;
import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource; import org.junit.Assert;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import org.junit.Test;
import static org.junit.Assert.assertThat; import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ConnectionFactory.class, ScriptRunner.class, FileReader.class})
public class UpgradeDaoTest { public class UpgradeDaoTest {
PostgresqlUpgradeDao postgresqlUpgradeDao = PostgresqlUpgradeDao.getInstance();
@Test @Test
public void testQueryQueryAllOldWorkerGroup() throws Exception{ public void testGetCurrentVersion() throws SQLException {
postgresqlUpgradeDao.updateProcessDefinitionJsonWorkerGroup(); PowerMockito.mockStatic(ConnectionFactory.class);
ConnectionFactory mockConnectionFactory = PowerMockito.mock(ConnectionFactory.class);
PowerMockito.when(ConnectionFactory.getInstance()).thenReturn(mockConnectionFactory);
DataSource mockDatasource = PowerMockito.mock(DataSource.class);
PowerMockito.when(mockConnectionFactory.getDataSource()).thenReturn(mockDatasource);
Connection mockConnection = PowerMockito.mock(Connection.class);
PowerMockito.when(mockDatasource.getConnection()).thenReturn(mockConnection);
PreparedStatement mockPrepareStatement = PowerMockito.mock(PreparedStatement.class);
PowerMockito.when(mockConnection.prepareStatement(Mockito.any())).thenReturn(mockPrepareStatement);
ResultSet mockResultSet = PowerMockito.mock(ResultSet.class);
PowerMockito.when(mockPrepareStatement.executeQuery()).thenReturn(mockResultSet);
DatabaseMetaData mockMetaData = PowerMockito.mock(DatabaseMetaData.class);
PowerMockito.when(mockConnection.getMetaData()).thenReturn(mockMetaData);
PowerMockito.when(mockMetaData.getDatabaseProductName()).thenReturn("mysql");
UpgradeDao upgradeDao = MysqlUpgradeDao.getInstance();
upgradeDao.getCurrentVersion("xx");
Assert.assertTrue(true);
} }
@Test(expected = IOException.class)
public void testInitSchema() throws Exception {
PowerMockito.mockStatic(ConnectionFactory.class);
ConnectionFactory mockConnectionFactory = PowerMockito.mock(ConnectionFactory.class);
PowerMockito.when(ConnectionFactory.getInstance()).thenReturn(mockConnectionFactory);
DataSource mockDatasource = PowerMockito.mock(DataSource.class);
PowerMockito.when(mockConnectionFactory.getDataSource()).thenReturn(mockDatasource);
Connection mockConnection = PowerMockito.mock(Connection.class);
PowerMockito.when(mockDatasource.getConnection()).thenReturn(mockConnection);
DatabaseMetaData mockMetaData = PowerMockito.mock(DatabaseMetaData.class);
PowerMockito.when(mockConnection.getMetaData()).thenReturn(mockMetaData);
PowerMockito.when(mockMetaData.getDatabaseProductName()).thenReturn("mysql");
UpgradeDao upgradeDao = MysqlUpgradeDao.getInstance();
upgradeDao.initSchema();
Assert.assertTrue(true);
}
} }

40
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/shell/CreateDolphinSchedulerTest.java

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade.shell;
import org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({DolphinSchedulerManager.class, CreateDolphinScheduler.class})
public class CreateDolphinSchedulerTest {
@Test
public void mainTest() throws Exception {
DolphinSchedulerManager mockManager = PowerMockito.mock(DolphinSchedulerManager.class);
PowerMockito.whenNew(DolphinSchedulerManager.class).withNoArguments().thenReturn(mockManager);
CreateDolphinScheduler.main(null);
Assert.assertTrue(true);
}
}

40
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/shell/InitDolphinSchedulerTest.java

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade.shell;
import org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({DolphinSchedulerManager.class, InitDolphinScheduler.class})
public class InitDolphinSchedulerTest {
@Test
public void main() throws Exception {
DolphinSchedulerManager mockManager = PowerMockito.mock(DolphinSchedulerManager.class);
PowerMockito.whenNew(DolphinSchedulerManager.class).withNoArguments().thenReturn(mockManager);
InitDolphinScheduler.main(null);
Assert.assertTrue(true);
}
}

2
pom.xml

@ -900,6 +900,8 @@
<include>**/dao/datasource/MySQLDataSourceTest.java</include> <include>**/dao/datasource/MySQLDataSourceTest.java</include>
<include>**/dao/entity/TaskInstanceTest.java</include> <include>**/dao/entity/TaskInstanceTest.java</include>
<include>**/dao/entity/UdfFuncTest.java</include> <include>**/dao/entity/UdfFuncTest.java</include>
<include>**/dao/upgrade/shell/CreateDolphinSchedulerTest.java</include>
<include>**/dao/upgrade/shell/InitDolphinSchedulerTest.java</include>
<include>**/remote/command/alert/AlertSendRequestCommandTest.java</include> <include>**/remote/command/alert/AlertSendRequestCommandTest.java</include>
<include>**/remote/command/alert/AlertSendResponseCommandTest.java</include> <include>**/remote/command/alert/AlertSendResponseCommandTest.java</include>
<include>**/remote/command/future/ResponseFutureTest.java</include> <include>**/remote/command/future/ResponseFutureTest.java</include>

Loading…
Cancel
Save