Browse Source

fix ResultSet not close and reformat code (#2183)

* fix ResultSet not close

* fix ResultSet not close

* extract code of closing resource to a method

* remove redundant if condition

* modify e2e timeout
pull/3/MERGE
tswstarplanet 4 years ago committed by gaojun2048
parent
commit
1d4cad7913
  1. 134
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
  2. 8
      e2e/src/test/resources/config/config.properties

134
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java

@ -50,25 +50,28 @@ public class TaskRecordDao {
static {
try {
conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
}catch (ConfigurationException e){
logger.error("load configuration exception",e);
} catch (ConfigurationException e) {
logger.error("load configuration exception", e);
System.exit(1);
}
}
/**
* get task record flag
* get task record flag
*
* @return whether startup taskrecord
*/
public static boolean getTaskRecordFlag(){
return conf.getBoolean(Constants.TASK_RECORD_FLAG);
public static boolean getTaskRecordFlag() {
return conf.getBoolean(Constants.TASK_RECORD_FLAG);
}
/**
* create connection
*
* @return connection
*/
private static Connection getConn() {
if(!getTaskRecordFlag()){
if (!getTaskRecordFlag()) {
return null;
}
String driver = "com.mysql.jdbc.Driver";
@ -90,101 +93,96 @@ public class TaskRecordDao {
/**
* generate where sql string
*
* @param filterMap filterMap
* @return sql string
*/
private static String getWhereString(Map<String, String> filterMap) {
if(filterMap.size() ==0){
if (filterMap.size() == 0) {
return "";
}
String result = " where 1=1 ";
Object taskName = filterMap.get("taskName");
if(taskName != null && StringUtils.isNotEmpty(taskName.toString())){
if (taskName != null && StringUtils.isNotEmpty(taskName.toString())) {
result += " and PROC_NAME like concat('%', '" + taskName.toString() + "', '%') ";
}
Object taskDate = filterMap.get("taskDate");
if(taskDate != null && StringUtils.isNotEmpty(taskDate.toString())){
if (taskDate != null && StringUtils.isNotEmpty(taskDate.toString())) {
result += " and PROC_DATE='" + taskDate.toString() + "'";
}
Object state = filterMap.get("state");
if(state != null && StringUtils.isNotEmpty(state.toString())){
if (state != null && StringUtils.isNotEmpty(state.toString())) {
result += " and NOTE='" + state.toString() + "'";
}
Object sourceTable = filterMap.get("sourceTable");
if(sourceTable!= null && StringUtils.isNotEmpty(sourceTable.toString())){
result += " and SOURCE_TAB like concat('%', '" + sourceTable.toString()+ "', '%')";
if (sourceTable != null && StringUtils.isNotEmpty(sourceTable.toString())) {
result += " and SOURCE_TAB like concat('%', '" + sourceTable.toString() + "', '%')";
}
Object targetTable = filterMap.get("targetTable");
if(sourceTable!= null && StringUtils.isNotEmpty(targetTable.toString())){
result += " and TARGET_TAB like concat('%', '"+ targetTable.toString()+"', '%') " ;
if (sourceTable != null && StringUtils.isNotEmpty(targetTable.toString())) {
result += " and TARGET_TAB like concat('%', '" + targetTable.toString() + "', '%') ";
}
Object start = filterMap.get("startTime");
if(start != null && StringUtils.isNotEmpty(start.toString())){
if (start != null && StringUtils.isNotEmpty(start.toString())) {
result += " and STARTDATE>='" + start.toString() + "'";
}
Object end = filterMap.get("endTime");
if(end != null && StringUtils.isNotEmpty(end.toString())){
result += " and ENDDATE>='" + end.toString()+ "'";
if (end != null && StringUtils.isNotEmpty(end.toString())) {
result += " and ENDDATE>='" + end.toString() + "'";
}
return result;
}
/**
* count task record
*
* @param filterMap filterMap
* @param table table
* @param table table
* @return task record count
*/
public static int countTaskRecord(Map<String, String> filterMap, String table){
public static int countTaskRecord(Map<String, String> filterMap, String table) {
int count = 0;
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = getConn();
if(conn == null){
if (conn == null) {
return count;
}
String sql = String.format("select count(1) as count from %s", table);
sql += getWhereString(filterMap);
pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery();
while(rs.next()){
rs = pstmt.executeQuery();
while (rs.next()) {
count = rs.getInt("count");
break;
}
} catch (SQLException e) {
logger.error("Exception ", e);
}finally {
try {
if(pstmt != null) {
pstmt.close();
}
if(conn != null){
conn.close();
}
} catch (SQLException e) {
logger.error("Exception ", e);
}
} finally {
closeResource(rs, pstmt, conn);
}
return count;
}
/**
* query task record by filter map paging
*
* @param filterMap filterMap
* @param table table
* @param table table
* @return task record list
*/
public static List<TaskRecord> queryAllTaskRecord(Map<String,String> filterMap , String table) {
public static List<TaskRecord> queryAllTaskRecord(Map<String, String> filterMap, String table) {
String sql = String.format("select * from %s", table);
sql += getWhereString(filterMap);
@ -194,9 +192,9 @@ public class TaskRecordDao {
sql += String.format(" order by STARTDATE desc limit %d,%d", offset, pageSize);
List<TaskRecord> recordList = new ArrayList<>();
try{
try {
recordList = getQueryResult(sql);
}catch (Exception e){
} catch (Exception e) {
logger.error("Exception ", e);
}
return recordList;
@ -204,6 +202,7 @@ public class TaskRecordDao {
/**
* convert result set to task record
*
* @param resultSet resultSet
* @return task record
* @throws SQLException if error throws SQLException
@ -232,6 +231,7 @@ public class TaskRecordDao {
/**
* query task list by select sql
*
* @param selectSql select sql
* @return task record list
*/
@ -239,65 +239,81 @@ public class TaskRecordDao {
List<TaskRecord> recordList = new ArrayList<>();
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = getConn();
if(conn == null){
if (conn == null) {
return recordList;
}
pstmt = conn.prepareStatement(selectSql);
ResultSet rs = pstmt.executeQuery();
rs = pstmt.executeQuery();
while(rs.next()){
while (rs.next()) {
TaskRecord taskRecord = convertToTaskRecord(rs);
recordList.add(taskRecord);
}
} catch (SQLException e) {
logger.error("Exception ", e);
}finally {
try {
if(pstmt != null) {
pstmt.close();
}
if(conn != null){
conn.close();
}
} catch (SQLException e) {
logger.error("Exception ", e);
}
} finally {
closeResource(rs, pstmt, conn);
}
return recordList;
}
/**
* according to procname and procdate query task record
*
* @param procName procName
* @param procDate procDate
* @return task record status
*/
public static TaskRecordStatus getTaskRecordState(String procName,String procDate){
public static TaskRecordStatus getTaskRecordState(String procName, String procDate) {
String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE like '%s'"
,procName,procDate + "%");
, procName, procDate + "%");
List<TaskRecord> taskRecordList = getQueryResult(sql);
// contains no record and sql exception
if (CollectionUtils.isEmpty(taskRecordList)){
if (CollectionUtils.isEmpty(taskRecordList)) {
// exception
return TaskRecordStatus.EXCEPTION;
}else if (taskRecordList.size() > 1){
} else if (taskRecordList.size() > 1) {
return TaskRecordStatus.EXCEPTION;
}else {
} else {
TaskRecord taskRecord = taskRecordList.get(0);
if (taskRecord == null){
if (taskRecord == null) {
return TaskRecordStatus.EXCEPTION;
}
Long targetRowCount = taskRecord.getTargetRowCount();
if (targetRowCount <= 0){
if (targetRowCount <= 0) {
return TaskRecordStatus.FAILURE;
}else {
} else {
return TaskRecordStatus.SUCCESS;
}
}
}
private static void closeResource(ResultSet rs, PreparedStatement pstmt, Connection conn) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
logger.error("Exception ", e);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
logger.error("Exception ", e);
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
logger.error("Exception ", e);
}
}
}
}

8
e2e/src/test/resources/config/config.properties

@ -27,13 +27,13 @@ PASSWORD=dolphinscheduler123
# driver path
driver.chromeDriver=/usr/local/bin/chromedriver
# implicitly wait(s)
driver.timeouts.implicitlyWait=10
driver.timeouts.implicitlyWait=30
# show wait(s)
driver.timeouts.webDriverWait=10
driver.timeouts.webDriverWait=30
# page load timeout(s)
driver.timeouts.pageLoadTimeout=10
driver.timeouts.pageLoadTimeout=30
# JS wait timeouts(s)
driver.timeouts.setScriptTimeout=10
driver.timeouts.setScriptTimeout=30
############### redis ##############

Loading…
Cancel
Save