From 30af55b82ae560a5300930af062ad3a88d542e3d Mon Sep 17 00:00:00 2001 From: Wenjun Ruan <861923274@qq.com> Date: Sat, 10 Jul 2021 01:14:59 +0800 Subject: [PATCH] [Improvement][Worker] Task log may be lost #5775 (#5783) --- .../worker/task/AbstractCommandExecutor.java | 61 ++++--------------- .../server/worker/task/AbstractTask.java | 10 +-- .../worker/task/PythonCommandExecutor.java | 3 +- .../worker/task/ShellCommandExecutor.java | 7 +-- .../worker/task/sqoop/SqoopTaskTest.java | 7 ++- 5 files changed, 25 insertions(+), 63 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 40efdd391a..47d3e8a394 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -48,6 +48,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -55,7 +56,6 @@ import java.util.regex.Pattern; import org.slf4j.Logger; - /** * abstract command executor */ @@ -74,7 +74,7 @@ public abstract class AbstractCommandExecutor { /** * log handler */ - protected Consumer> logHandler; + protected Consumer> logHandler; /** * logger @@ -82,9 +82,9 @@ public abstract class AbstractCommandExecutor { protected Logger logger; /** - * log list + * log collection */ - protected final List logBuffer; + protected final LinkedBlockingQueue logBuffer; protected boolean logOutputIsScuccess = false; @@ -98,20 +98,16 @@ public abstract class AbstractCommandExecutor { */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; - public AbstractCommandExecutor(Consumer> logHandler, + public AbstractCommandExecutor(Consumer> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) { this.logHandler = logHandler; this.taskExecutionContext = taskExecutionContext; this.logger = logger; - this.logBuffer = Collections.synchronizedList(new ArrayList<>()); + this.logBuffer = new LinkedBlockingQueue<>(); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } - protected AbstractCommandExecutor(List logBuffer) { - this.logBuffer = logBuffer; - } - /** * build process * @@ -232,7 +228,6 @@ public abstract class AbstractCommandExecutor { return varPool.toString(); } - /** * cancel application * @@ -329,15 +324,14 @@ public abstract class AbstractCommandExecutor { */ private void clear() { - List markerList = new ArrayList<>(); - markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); + LinkedBlockingQueue markerLog = new LinkedBlockingQueue<>(); + markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); if (!logBuffer.isEmpty()) { // log handle logHandler.accept(logBuffer); - logBuffer.clear(); } - logHandler.accept(markerList); + logHandler.accept(markerLog); } /** @@ -349,9 +343,7 @@ public abstract class AbstractCommandExecutor { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); getOutputLogService.submit(() -> { - BufferedReader inReader = null; - try { - inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; logBuffer.add("welcome to use bigdata scheduling system..."); while ((line = inReader.readLine()) != null) { @@ -366,7 +358,6 @@ public abstract class AbstractCommandExecutor { logger.error(e.getMessage(), e); } finally { logOutputIsScuccess = true; - close(inReader); } }); getOutputLogService.shutdown(); @@ -460,31 +451,20 @@ public abstract class AbstractCommandExecutor { * @return line list */ private List convertFile2List(String filename) { - List lineList = new ArrayList(100); + List lineList = new ArrayList<>(100); File file = new File(filename); if (!file.exists()) { return lineList; } - BufferedReader br = null; - try { - br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8)); + try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8))) { String line = null; while ((line = br.readLine()) != null) { lineList.add(line); } } catch (Exception e) { logger.error(String.format("read file: %s failed : ", filename), e); - } finally { - if (br != null) { - try { - br.close(); - } catch (IOException e) { - logger.error(e.getMessage(), e); - } - } - } return lineList; } @@ -556,27 +536,10 @@ public abstract class AbstractCommandExecutor { lastFlushTime = now; /** log handle */ logHandler.accept(logBuffer); - - logBuffer.clear(); } return lastFlushTime; } - /** - * close buffer reader - * - * @param inReader in reader - */ - private void close(BufferedReader inReader) { - if (inReader != null) { - try { - inReader.close(); - } catch (IOException e) { - logger.error(e.getMessage(), e); - } - } - } - protected List commandOptions() { return Collections.emptyList(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 81b80974b6..f84b6d75bc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import java.util.List; import java.util.StringJoiner; +import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; @@ -116,15 +116,15 @@ public abstract class AbstractTask { * * @param logs log list */ - public void logHandle(List logs) { + public void logHandle(LinkedBlockingQueue logs) { // note that the "new line" is added here to facilitate log parsing if (logs.contains(FINALIZE_SESSION_MARKER.toString())) { logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString()); } else { - // note: if the logs is a SynchronizedList and will be modified concurrently, - // we should must use foreach to iterate the element, otherwise will throw a ConcurrentModifiedException(#issue 5528) StringJoiner joiner = new StringJoiner("\n\t"); - logs.forEach(joiner::add); + while (!logs.isEmpty()) { + joiner.add(logs.poll()); + } logger.info(" -> {}", joiner); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java index edf102b694..2aa80f1ace 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java @@ -32,6 +32,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; import java.util.regex.Pattern; @@ -60,7 +61,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { * @param taskExecutionContext taskExecutionContext * @param logger logger */ - public PythonCommandExecutor(Consumer> logHandler, + public PythonCommandExecutor(Consumer> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) { super(logHandler,taskExecutionContext,logger); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index b547ef9817..bfc1ae4240 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; import org.slf4j.Logger; @@ -53,16 +54,12 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { * @param taskExecutionContext taskExecutionContext * @param logger logger */ - public ShellCommandExecutor(Consumer> logHandler, + public ShellCommandExecutor(Consumer> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) { super(logHandler,taskExecutionContext,logger); } - public ShellCommandExecutor(List logBuffer) { - super(logBuffer); - } - @Override protected String buildCommandFilePath() { // command file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index 222c35593a..f72a943142 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import org.junit.Assert; import org.junit.Before; @@ -205,15 +206,15 @@ public class SqoopTaskTest { @Test public void testLogHandler() throws InterruptedException { - List list = Collections.synchronizedList(new ArrayList<>()); + LinkedBlockingQueue loggerBuffer = new LinkedBlockingQueue<>(); Thread thread1 = new Thread(() -> { for (int i = 0; i < 10; i++) { - list.add("test add log"); + loggerBuffer.add("test add log"); } }); Thread thread2 = new Thread(() -> { for (int i = 0; i < 10; i++) { - sqoopTask.logHandle(list); + sqoopTask.logHandle(loggerBuffer); } }); thread1.start();