diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 3f0bfaded0..9405756150 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -40,6 +40,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -68,7 +69,7 @@ public abstract class AbstractCommandExecutor { /** * log handler */ - protected Consumer> logHandler; + protected Consumer> logHandler; /** * logger @@ -78,7 +79,7 @@ public abstract class AbstractCommandExecutor { /** * log list */ - protected List logBuffer; + protected LinkedBlockingQueue logBuffer; protected boolean logOutputIsSuccess = false; @@ -92,16 +93,16 @@ public abstract class AbstractCommandExecutor { */ protected TaskRequest taskRequest; - public AbstractCommandExecutor(Consumer> logHandler, + public AbstractCommandExecutor(Consumer> logHandler, TaskRequest taskRequest, Logger logger) { this.logHandler = logHandler; this.taskRequest = taskRequest; this.logger = logger; - this.logBuffer = Collections.synchronizedList(new ArrayList<>()); + this.logBuffer = new LinkedBlockingQueue<>(); } - public AbstractCommandExecutor(List logBuffer) { + public AbstractCommandExecutor(LinkedBlockingQueue logBuffer) { this.logBuffer = logBuffer; } @@ -290,15 +291,15 @@ public abstract class AbstractCommandExecutor { */ private void clear() { - List markerList = new ArrayList<>(); - markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); + LinkedBlockingQueue markerLog = new LinkedBlockingQueue<>(1); + markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); if (!logBuffer.isEmpty()) { // log handle logHandler.accept(logBuffer); logBuffer.clear(); } - logHandler.accept(markerList); + logHandler.accept(markerLog); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java index 27c568b226..662a15fbe9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java @@ -17,11 +17,15 @@ package org.apache.dolphinscheduler.plugin.task.api; +import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; + import org.apache.dolphinscheduler.plugin.task.util.LoggerUtils; import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import java.util.List; +import java.util.StringJoiner; +import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,12 +56,16 @@ public abstract class AbstractTaskExecutor extends AbstractTask { * * @param logs log list */ - public void logHandle(List logs) { + public void logHandle(LinkedBlockingQueue logs) { // note that the "new line" is added here to facilitate log parsing if (logs.contains(FINALIZE_SESSION_MARKER.toString())) { logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString()); } else { - logger.info(" -> {}", String.join("\n\t", logs)); + StringJoiner joiner = new StringJoiner("\n\t"); + while (!logs.isEmpty()) { + joiner.add(logs.poll()); + } + logger.info(" -> {}", joiner); } } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java index bd1f0b4086..5272c0e218 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java @@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; import org.slf4j.Logger; @@ -54,13 +55,13 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { * @param taskRequest taskRequest * @param logger logger */ - public ShellCommandExecutor(Consumer> logHandler, + public ShellCommandExecutor(Consumer> logHandler, TaskRequest taskRequest, Logger logger) { super(logHandler, taskRequest, logger); } - public ShellCommandExecutor(List logBuffer) { + public ShellCommandExecutor(LinkedBlockingQueue logBuffer) { super(logBuffer); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java index 007c5ddf62..8e118f94be 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java @@ -31,7 +31,7 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; import java.util.regex.Pattern; @@ -62,7 +62,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { * @param taskRequest TaskRequest * @param logger logger */ - public PythonCommandExecutor(Consumer> logHandler, + public PythonCommandExecutor(Consumer> logHandler, TaskRequest taskRequest, Logger logger) { super(logHandler, taskRequest, logger);