Browse Source

refactor code style

pull/3/MERGE
baoliang 4 years ago
parent
commit
d4d7ca32e0
  1. 159
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
  2. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java

159
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@ -90,7 +90,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* init schema
*/
public void initSchema(){
public void initSchema() {
DbType dbType = getDbType();
String initSqlPath = "";
if (dbType != null) {
@ -113,6 +113,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* init scheam
*
* @param initSqlPath initSqlPath
*/
public void initSchema(String initSqlPath) {
@ -128,6 +129,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* run DML
*
* @param initSqlPath initSqlPath
*/
private void runInitDML(String initSqlPath) {
@ -150,20 +152,20 @@ public abstract class UpgradeDao extends AbstractBaseDao {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(),e1);
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
try {
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(),e1);
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(conn);
@ -173,6 +175,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* run DDL
*
* @param initSqlPath initSqlPath
*/
private void runInitDDL(String initSqlPath) {
@ -191,12 +194,12 @@ public abstract class UpgradeDao extends AbstractBaseDao {
} catch (IOException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(conn);
@ -206,6 +209,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* determines whether a table exists
*
* @param tableName tableName
* @return if table exist return trueelse return false
*/
@ -213,20 +217,22 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* determines whether a field exists in the specified table
* @param tableName tableName
*
* @param tableName tableName
* @param columnName columnName
* @return if column name exist return trueelse return false
* @return if column name exist return trueelse return false
*/
public abstract boolean isExistsColumn(String tableName,String columnName);
public abstract boolean isExistsColumn(String tableName, String columnName);
/**
* get current version
*
* @param versionName versionName
* @return version
*/
public String getCurrentVersion(String versionName) {
String sql = String.format("select version from %s",versionName);
String sql = String.format("select version from %s", versionName);
Connection conn = null;
ResultSet rs = null;
PreparedStatement pstmt = null;
@ -243,7 +249,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
return version;
} catch (SQLException e) {
logger.error(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
@ -253,6 +259,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* upgrade DolphinScheduler
*
* @param schemaDir schema dir
*/
public void upgradeDolphinScheduler(String schemaDir) {
@ -282,109 +289,109 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* updateProcessDefinitionJsonWorkerGroup
*/
protected void updateProcessDefinitionJsonWorkerGroup(){
protected void updateProcessDefinitionJsonWorkerGroup() {
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>();
Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<Integer, String> oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0 ;i < tasks.size() ; i++){
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId");
Integer workerGroupId = -1;
if(workerGroupNode != null && workerGroupNode.canConvertToInt()){
if (workerGroupNode != null && workerGroupNode.canConvertToInt()) {
workerGroupId = workerGroupNode.asInt(-1);
}
if (workerGroupId == -1) {
task.put("workerGroup", "default");
}else {
} else {
task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId));
}
}
jsonObject.remove("task");
jsonObject.put("tasks",tasks);
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toString());
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0){
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), replaceProcessDefinitionMap);
}
}catch (Exception e){
logger.error("update process definition json workergroup error",e);
} catch (Exception e) {
logger.error("update process definition json workergroup error", e);
}
}
/**
* updateProcessDefinitionJsonResourceList
*/
protected void updateProcessDefinitionJsonResourceList(){
protected void updateProcessDefinitionJsonResourceList() {
ResourceDao resourceDao = new ResourceDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>();
Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<String,Integer> resourcesMap = resourceDao.listAllResources(dataSource.getConnection());
Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
Map<String, Integer> resourcesMap = resourceDao.listAllResources(dataSource.getConnection());
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0 ;i < tasks.size() ; i++){
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.get(i);
ObjectNode param = (ObjectNode)task.get("params");
ObjectNode param = (ObjectNode) task.get("params");
if (param != null) {
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
ResourceInfo mainJar = JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class);
if (mainJar != null && mainJar.getId() == 0) {
String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes() : String.format("/%s",mainJar.getRes());
String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes() : String.format("/%s", mainJar.getRes());
if (resourcesMap.containsKey(fullName)) {
mainJar.setId(resourcesMap.get(fullName));
param.put("mainJar",JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
param.put("mainJar", JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
}
}
if (CollectionUtils.isNotEmpty(resourceList)) {
List<ResourceInfo> newResourceList = resourceList.stream().map(resInfo -> {
String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s",resInfo.getRes());
String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s", resInfo.getRes());
if (resInfo.getId() == 0 && resourcesMap.containsKey(fullName)) {
resInfo.setId(resourcesMap.get(fullName));
}
return resInfo;
}).collect(Collectors.toList());
param.put("resourceList",JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
param.put("resourceList", JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
}
}
task.put("params",param);
task.put("params", param);
}
jsonObject.remove("tasks");
jsonObject.put("tasks",tasks);
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toString());
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0){
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), replaceProcessDefinitionMap);
}
}catch (Exception e){
logger.error("update process definition json resource list error",e);
} catch (Exception e) {
logger.error("update process definition json resource list error", e);
}
}
/**
* upgradeDolphinScheduler DML
*
* @param schemaDir schemaDir
*/
private void upgradeDolphinSchedulerDML(String schemaDir) {
@ -392,8 +399,8 @@ public abstract class UpgradeDao extends AbstractBaseDao {
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
String sqlFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/dolphinscheduler_dml.sql",rootDir,schemaDir,getDbType().name().toLowerCase());
logger.info("sqlSQLFilePath"+sqlFilePath);
String sqlFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/dolphinscheduler_dml.sql", rootDir, schemaDir, getDbType().name().toLowerCase());
logger.info("sqlSQLFilePath" + sqlFilePath);
Connection conn = null;
PreparedStatement pstmt = null;
try {
@ -405,13 +412,13 @@ public abstract class UpgradeDao extends AbstractBaseDao {
scriptRunner.runScript(sqlReader);
if (isExistsTable(T_VERSION_NAME)) {
// Change version in the version table to the new version
String upgradeSQL = String.format("update %s set version = ?",T_VERSION_NAME);
String upgradeSQL = String.format("update %s set version = ?", T_VERSION_NAME);
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
}else if (isExistsTable(T_NEW_VERSION_NAME)) {
} else if (isExistsTable(T_NEW_VERSION_NAME)) {
// Change version in the version table to the new version
String upgradeSQL = String.format("update %s set version = ?",T_NEW_VERSION_NAME);
String upgradeSQL = String.format("update %s set version = ?", T_NEW_VERSION_NAME);
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
@ -421,38 +428,38 @@ public abstract class UpgradeDao extends AbstractBaseDao {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(),e1);
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException("sql file not found ", e);
} catch (IOException e) {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(),e1);
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (SQLException e) {
try {
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(),e1);
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
try {
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(),e1);
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
@ -461,13 +468,14 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* upgradeDolphinScheduler DDL
*
* @param schemaDir schemaDir
*/
private void upgradeDolphinSchedulerDDL(String schemaDir) {
if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found");
}
String sqlFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/dolphinscheduler_ddl.sql",rootDir,schemaDir,getDbType().name().toLowerCase());
String sqlFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/dolphinscheduler_ddl.sql", rootDir, schemaDir, getDbType().name().toLowerCase());
Connection conn = null;
PreparedStatement pstmt = null;
try {
@ -482,20 +490,20 @@ public abstract class UpgradeDao extends AbstractBaseDao {
} catch (FileNotFoundException e) {
logger.error(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException("sql file not found ", e);
} catch (IOException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (SQLException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
@ -505,15 +513,16 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/**
* update version
*
* @param version version
*/
public void updateVersion(String version) {
// Change version in the version table to the new version
String versionName = T_VERSION_NAME;
if(!SchemaUtils.isAGreatVersion("1.2.0" , version)){
if (!SchemaUtils.isAGreatVersion("1.2.0", version)) {
versionName = "t_ds_version";
}
String upgradeSQL = String.format("update %s set version = ?",versionName);
String upgradeSQL = String.format("update %s set version = ?", versionName);
PreparedStatement pstmt = null;
Connection conn = null;
try {
@ -523,7 +532,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
pstmt.executeUpdate();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + upgradeSQL, e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java

@ -35,8 +35,8 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

Loading…
Cancel
Save