Browse Source

fix ResultSet not close and reformat code (#2183)

* fix ResultSet not close

* fix ResultSet not close

* extract code of closing resource to a method

* remove redundant if condition

* modify e2e timeout
pull/2/head
tswstarplanet 5 years ago committed by GitHub
parent
commit
f28ba55014
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 126
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
  2. 8
      e2e/src/test/resources/config/config.properties

126
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java

@ -50,25 +50,28 @@ public class TaskRecordDao {
static { static {
try { try {
conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES); conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
}catch (ConfigurationException e){ } catch (ConfigurationException e) {
logger.error("load configuration exception",e); logger.error("load configuration exception", e);
System.exit(1); System.exit(1);
} }
} }
/** /**
* get task record flag * get task record flag
*
* @return whether startup taskrecord * @return whether startup taskrecord
*/ */
public static boolean getTaskRecordFlag(){ public static boolean getTaskRecordFlag() {
return conf.getBoolean(Constants.TASK_RECORD_FLAG); return conf.getBoolean(Constants.TASK_RECORD_FLAG);
} }
/** /**
* create connection * create connection
*
* @return connection * @return connection
*/ */
private static Connection getConn() { private static Connection getConn() {
if(!getTaskRecordFlag()){ if (!getTaskRecordFlag()) {
return null; return null;
} }
String driver = "com.mysql.jdbc.Driver"; String driver = "com.mysql.jdbc.Driver";
@ -90,101 +93,96 @@ public class TaskRecordDao {
/** /**
* generate where sql string * generate where sql string
*
* @param filterMap filterMap * @param filterMap filterMap
* @return sql string * @return sql string
*/ */
private static String getWhereString(Map<String, String> filterMap) { private static String getWhereString(Map<String, String> filterMap) {
if(filterMap.size() ==0){ if (filterMap.size() == 0) {
return ""; return "";
} }
String result = " where 1=1 "; String result = " where 1=1 ";
Object taskName = filterMap.get("taskName"); Object taskName = filterMap.get("taskName");
if(taskName != null && StringUtils.isNotEmpty(taskName.toString())){ if (taskName != null && StringUtils.isNotEmpty(taskName.toString())) {
result += " and PROC_NAME like concat('%', '" + taskName.toString() + "', '%') "; result += " and PROC_NAME like concat('%', '" + taskName.toString() + "', '%') ";
} }
Object taskDate = filterMap.get("taskDate"); Object taskDate = filterMap.get("taskDate");
if(taskDate != null && StringUtils.isNotEmpty(taskDate.toString())){ if (taskDate != null && StringUtils.isNotEmpty(taskDate.toString())) {
result += " and PROC_DATE='" + taskDate.toString() + "'"; result += " and PROC_DATE='" + taskDate.toString() + "'";
} }
Object state = filterMap.get("state"); Object state = filterMap.get("state");
if(state != null && StringUtils.isNotEmpty(state.toString())){ if (state != null && StringUtils.isNotEmpty(state.toString())) {
result += " and NOTE='" + state.toString() + "'"; result += " and NOTE='" + state.toString() + "'";
} }
Object sourceTable = filterMap.get("sourceTable"); Object sourceTable = filterMap.get("sourceTable");
if(sourceTable!= null && StringUtils.isNotEmpty(sourceTable.toString())){ if (sourceTable != null && StringUtils.isNotEmpty(sourceTable.toString())) {
result += " and SOURCE_TAB like concat('%', '" + sourceTable.toString()+ "', '%')"; result += " and SOURCE_TAB like concat('%', '" + sourceTable.toString() + "', '%')";
} }
Object targetTable = filterMap.get("targetTable"); Object targetTable = filterMap.get("targetTable");
if(sourceTable!= null && StringUtils.isNotEmpty(targetTable.toString())){ if (sourceTable != null && StringUtils.isNotEmpty(targetTable.toString())) {
result += " and TARGET_TAB like concat('%', '"+ targetTable.toString()+"', '%') " ; result += " and TARGET_TAB like concat('%', '" + targetTable.toString() + "', '%') ";
} }
Object start = filterMap.get("startTime"); Object start = filterMap.get("startTime");
if(start != null && StringUtils.isNotEmpty(start.toString())){ if (start != null && StringUtils.isNotEmpty(start.toString())) {
result += " and STARTDATE>='" + start.toString() + "'"; result += " and STARTDATE>='" + start.toString() + "'";
} }
Object end = filterMap.get("endTime"); Object end = filterMap.get("endTime");
if(end != null && StringUtils.isNotEmpty(end.toString())){ if (end != null && StringUtils.isNotEmpty(end.toString())) {
result += " and ENDDATE>='" + end.toString()+ "'"; result += " and ENDDATE>='" + end.toString() + "'";
} }
return result; return result;
} }
/** /**
* count task record * count task record
*
* @param filterMap filterMap * @param filterMap filterMap
* @param table table * @param table table
* @return task record count * @return task record count
*/ */
public static int countTaskRecord(Map<String, String> filterMap, String table){ public static int countTaskRecord(Map<String, String> filterMap, String table) {
int count = 0; int count = 0;
Connection conn = null; Connection conn = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null;
try { try {
conn = getConn(); conn = getConn();
if(conn == null){ if (conn == null) {
return count; return count;
} }
String sql = String.format("select count(1) as count from %s", table); String sql = String.format("select count(1) as count from %s", table);
sql += getWhereString(filterMap); sql += getWhereString(filterMap);
pstmt = conn.prepareStatement(sql); pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
while(rs.next()){ while (rs.next()) {
count = rs.getInt("count"); count = rs.getInt("count");
break; break;
} }
} catch (SQLException e) { } catch (SQLException e) {
logger.error("Exception ", e); logger.error("Exception ", e);
}finally { } finally {
try { closeResource(rs, pstmt, conn);
if(pstmt != null) {
pstmt.close();
}
if(conn != null){
conn.close();
}
} catch (SQLException e) {
logger.error("Exception ", e);
}
} }
return count; return count;
} }
/** /**
* query task record by filter map paging * query task record by filter map paging
*
* @param filterMap filterMap * @param filterMap filterMap
* @param table table * @param table table
* @return task record list * @return task record list
*/ */
public static List<TaskRecord> queryAllTaskRecord(Map<String,String> filterMap , String table) { public static List<TaskRecord> queryAllTaskRecord(Map<String, String> filterMap, String table) {
String sql = String.format("select * from %s", table); String sql = String.format("select * from %s", table);
sql += getWhereString(filterMap); sql += getWhereString(filterMap);
@ -194,9 +192,9 @@ public class TaskRecordDao {
sql += String.format(" order by STARTDATE desc limit %d,%d", offset, pageSize); sql += String.format(" order by STARTDATE desc limit %d,%d", offset, pageSize);
List<TaskRecord> recordList = new ArrayList<>(); List<TaskRecord> recordList = new ArrayList<>();
try{ try {
recordList = getQueryResult(sql); recordList = getQueryResult(sql);
}catch (Exception e){ } catch (Exception e) {
logger.error("Exception ", e); logger.error("Exception ", e);
} }
return recordList; return recordList;
@ -204,6 +202,7 @@ public class TaskRecordDao {
/** /**
* convert result set to task record * convert result set to task record
*
* @param resultSet resultSet * @param resultSet resultSet
* @return task record * @return task record
* @throws SQLException if error throws SQLException * @throws SQLException if error throws SQLException
@ -232,6 +231,7 @@ public class TaskRecordDao {
/** /**
* query task list by select sql * query task list by select sql
*
* @param selectSql select sql * @param selectSql select sql
* @return task record list * @return task record list
*/ */
@ -239,65 +239,81 @@ public class TaskRecordDao {
List<TaskRecord> recordList = new ArrayList<>(); List<TaskRecord> recordList = new ArrayList<>();
Connection conn = null; Connection conn = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null;
try { try {
conn = getConn(); conn = getConn();
if(conn == null){ if (conn == null) {
return recordList; return recordList;
} }
pstmt = conn.prepareStatement(selectSql); pstmt = conn.prepareStatement(selectSql);
ResultSet rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
while(rs.next()){ while (rs.next()) {
TaskRecord taskRecord = convertToTaskRecord(rs); TaskRecord taskRecord = convertToTaskRecord(rs);
recordList.add(taskRecord); recordList.add(taskRecord);
} }
} catch (SQLException e) { } catch (SQLException e) {
logger.error("Exception ", e); logger.error("Exception ", e);
}finally { } finally {
try { closeResource(rs, pstmt, conn);
if(pstmt != null) {
pstmt.close();
}
if(conn != null){
conn.close();
}
} catch (SQLException e) {
logger.error("Exception ", e);
}
} }
return recordList; return recordList;
} }
/** /**
* according to procname and procdate query task record * according to procname and procdate query task record
*
* @param procName procName * @param procName procName
* @param procDate procDate * @param procDate procDate
* @return task record status * @return task record status
*/ */
public static TaskRecordStatus getTaskRecordState(String procName,String procDate){ public static TaskRecordStatus getTaskRecordState(String procName, String procDate) {
String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE like '%s'" String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE like '%s'"
,procName,procDate + "%"); , procName, procDate + "%");
List<TaskRecord> taskRecordList = getQueryResult(sql); List<TaskRecord> taskRecordList = getQueryResult(sql);
// contains no record and sql exception // contains no record and sql exception
if (CollectionUtils.isEmpty(taskRecordList)){ if (CollectionUtils.isEmpty(taskRecordList)) {
// exception // exception
return TaskRecordStatus.EXCEPTION; return TaskRecordStatus.EXCEPTION;
}else if (taskRecordList.size() > 1){ } else if (taskRecordList.size() > 1) {
return TaskRecordStatus.EXCEPTION; return TaskRecordStatus.EXCEPTION;
}else { } else {
TaskRecord taskRecord = taskRecordList.get(0); TaskRecord taskRecord = taskRecordList.get(0);
if (taskRecord == null){ if (taskRecord == null) {
return TaskRecordStatus.EXCEPTION; return TaskRecordStatus.EXCEPTION;
} }
Long targetRowCount = taskRecord.getTargetRowCount(); Long targetRowCount = taskRecord.getTargetRowCount();
if (targetRowCount <= 0){ if (targetRowCount <= 0) {
return TaskRecordStatus.FAILURE; return TaskRecordStatus.FAILURE;
}else { } else {
return TaskRecordStatus.SUCCESS; return TaskRecordStatus.SUCCESS;
} }
} }
} }
private static void closeResource(ResultSet rs, PreparedStatement pstmt, Connection conn) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
logger.error("Exception ", e);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
logger.error("Exception ", e);
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
logger.error("Exception ", e);
}
}
}
} }

8
e2e/src/test/resources/config/config.properties

@ -27,13 +27,13 @@ PASSWORD=dolphinscheduler123
# driver path # driver path
driver.chromeDriver=/usr/local/bin/chromedriver driver.chromeDriver=/usr/local/bin/chromedriver
# implicitly wait(s) # implicitly wait(s)
driver.timeouts.implicitlyWait=10 driver.timeouts.implicitlyWait=30
# show wait(s) # show wait(s)
driver.timeouts.webDriverWait=10 driver.timeouts.webDriverWait=30
# page load timeout(s) # page load timeout(s)
driver.timeouts.pageLoadTimeout=10 driver.timeouts.pageLoadTimeout=30
# JS wait timeouts(s) # JS wait timeouts(s)
driver.timeouts.setScriptTimeout=10 driver.timeouts.setScriptTimeout=30
############### redis ############## ############### redis ##############

Loading…
Cancel
Save