Browse Source

fix: No operations allowed after statement closed when running sql task (#2265)

* sqlTask failed to run

* get correct attribute value and logger content

Co-authored-by: songqh <songquanhe@foxmail.com>
pull/3/MERGE
songquanhe-gitstudy 5 years ago committed by gaojun2048
parent
commit
fc18d4a216
  1. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  3. 24
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -419,13 +419,13 @@ public class OSUtils {
* @return check memory and cpu usage * @return check memory and cpu usage
*/ */
public static Boolean checkResource(double systemCpuLoad, double systemReservedMemory){ public static Boolean checkResource(double systemCpuLoad, double systemReservedMemory){
// judging usage // system load average
double loadAverage = OSUtils.loadAverage(); double loadAverage = OSUtils.loadAverage();
// // system available physical memory
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
if(loadAverage > systemCpuLoad || availablePhysicalMemorySize < systemReservedMemory){ if(loadAverage > systemCpuLoad || availablePhysicalMemorySize < systemReservedMemory){
logger.warn("load or availablePhysicalMemorySize(G) is too high, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize , loadAverage); logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize , loadAverage);
return false; return false;
}else{ }else{
return true; return true;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -34,7 +34,7 @@ public class WorkerConfig {
@Value("${worker.max.cpuload.avg}") @Value("${worker.max.cpuload.avg}")
private int workerMaxCpuloadAvg; private int workerMaxCpuloadAvg;
@Value("${master.reserved.memory}") @Value("${worker.reserved.memory}")
private double workerReservedMemory; private double workerReservedMemory;
public int getWorkerExecThreads() { public int getWorkerExecThreads() {

24
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -363,20 +363,20 @@ public class SqlTask extends AbstractTask {
// is the timeout set // is the timeout set
boolean timeoutFlag = taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || boolean timeoutFlag = taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED ||
taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
try (PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql())) { // prepare statement
if(timeoutFlag){ PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
stmt.setQueryTimeout(taskProps.getTaskTimeout()); if(timeoutFlag){
} stmt.setQueryTimeout(taskProps.getTaskTimeout());
Map<Integer, Property> params = sqlBinds.getParamsMap(); }
if(params != null) { Map<Integer, Property> params = sqlBinds.getParamsMap();
for (Map.Entry<Integer, Property> entry : params.entrySet()) { if(params != null) {
Property prop = entry.getValue(); for (Map.Entry<Integer, Property> entry : params.entrySet()) {
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue()); Property prop = entry.getValue();
} ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
} }
logger.info("prepare statement replace sql : {} ", stmt);
return stmt;
} }
logger.info("prepare statement replace sql : {} ", stmt);
return stmt;
} }
/** /**

Loading…
Cancel
Save