Browse Source

Fix task log might delay until task finished due to batch flush (#13502)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
5a9cb7ee64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 35
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  2. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

35
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -359,7 +359,7 @@ public abstract class AbstractCommandExecutor {
}
logOutputIsSuccess = true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("Parse var pool error", e);
logOutputIsSuccess = true;
}
});
@ -369,17 +369,16 @@ public abstract class AbstractCommandExecutor {
ExecutorService parseProcessOutputExecutorService = newDaemonSingleThreadExecutor(threadLoggerInfoName);
parseProcessOutputExecutorService.submit(() -> {
try {
long lastFlushTime = System.currentTimeMillis();
while (logBuffer.size() > 0 || !logOutputIsSuccess) {
if (logBuffer.size() > 0) {
lastFlushTime = flush(lastFlushTime);
while (!logBuffer.isEmpty() || !logOutputIsSuccess) {
if (!logBuffer.isEmpty()) {
logHandler.accept(logBuffer);
logBuffer.clear();
} else {
Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
}
}
} catch (Exception e) {
Thread.currentThread().interrupt();
logger.error(e.getMessage(), e);
logger.error("Output task log error", e);
} finally {
clear();
}
@ -438,28 +437,6 @@ public abstract class AbstractCommandExecutor {
return processId;
}
/**
* when log buffer siz or flush time reach condition , then flush
*
* @param lastFlushTime last flush time
* @return last flush time
*/
private long flush(long lastFlushTime) {
long now = System.currentTimeMillis();
/*
* when log buffer siz or flush time reach condition , then flush
*/
if (logBuffer.size() >= TaskConstants.DEFAULT_LOG_ROWS_NUM
|| now - lastFlushTime > TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL) {
lastFlushTime = now;
logHandler.accept(logBuffer);
logBuffer.clear();
}
return lastFlushTime;
}
protected abstract String buildCommandFilePath();
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;

5
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

@ -134,11 +134,6 @@ public class TaskConstants {
public static final String SH = "sh";
/**
* default log cache rows num,output when reach the number
*/
public static final int DEFAULT_LOG_ROWS_NUM = 4 * 16;
/**
* log flush interval?output when reach the interval
*/

Loading…
Cancel
Save