From 5a9cb7ee64783776ae6fa0865937d25234e0159a Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 2 Feb 2023 21:48:03 +0800 Subject: [PATCH] Fix task log might delay until task finished due to batch flush (#13502) --- .../task/api/AbstractCommandExecutor.java | 35 ++++--------------- .../plugin/task/api/TaskConstants.java | 5 --- 2 files changed, 6 insertions(+), 34 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index c79b4a8746..201684ddca 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -359,7 +359,7 @@ public abstract class AbstractCommandExecutor { } logOutputIsSuccess = true; } catch (Exception e) { - logger.error(e.getMessage(), e); + logger.error("Parse var pool error", e); logOutputIsSuccess = true; } }); @@ -369,17 +369,16 @@ public abstract class AbstractCommandExecutor { ExecutorService parseProcessOutputExecutorService = newDaemonSingleThreadExecutor(threadLoggerInfoName); parseProcessOutputExecutorService.submit(() -> { try { - long lastFlushTime = System.currentTimeMillis(); - while (logBuffer.size() > 0 || !logOutputIsSuccess) { - if (logBuffer.size() > 0) { - lastFlushTime = flush(lastFlushTime); + while (!logBuffer.isEmpty() || !logOutputIsSuccess) { + if (!logBuffer.isEmpty()) { + logHandler.accept(logBuffer); + logBuffer.clear(); } else { Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL); } } } catch (Exception e) { - Thread.currentThread().interrupt(); - logger.error(e.getMessage(), e); + logger.error("Output task log error", e); } finally { clear(); } @@ -438,28 +437,6 @@ public abstract class AbstractCommandExecutor { return processId; } - /** - * when log buffer siz or flush time reach condition , then flush - * - * @param lastFlushTime last flush time - * @return last flush time - */ - private long flush(long lastFlushTime) { - long now = System.currentTimeMillis(); - - /* - * when log buffer siz or flush time reach condition , then flush - */ - if (logBuffer.size() >= TaskConstants.DEFAULT_LOG_ROWS_NUM - || now - lastFlushTime > TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL) { - lastFlushTime = now; - logHandler.accept(logBuffer); - - logBuffer.clear(); - } - return lastFlushTime; - } - protected abstract String buildCommandFilePath(); protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java index 8337dc6f31..a0928d0eac 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java @@ -134,11 +134,6 @@ public class TaskConstants { public static final String SH = "sh"; - /** - * default log cache rows num,output when reach the number - */ - public static final int DEFAULT_LOG_ROWS_NUM = 4 * 16; - /** * log flush interval?output when reach the interval */