Browse Source

[Plugin][Task]Fix Task log may be lost (#6294)

2.0.7-release
Kirs 3 years ago committed by GitHub
parent
commit
9115062838
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  2. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
  3. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
  4. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java

17
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -40,6 +40,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -68,7 +69,7 @@ public abstract class AbstractCommandExecutor {
/**
* log handler
*/
protected Consumer<List<String>> logHandler;
protected Consumer<LinkedBlockingQueue<String>> logHandler;
/**
* logger
@ -78,7 +79,7 @@ public abstract class AbstractCommandExecutor {
/**
* log list
*/
protected List<String> logBuffer;
protected LinkedBlockingQueue<String> logBuffer;
protected boolean logOutputIsSuccess = false;
@ -92,16 +93,16 @@ public abstract class AbstractCommandExecutor {
*/
protected TaskRequest taskRequest;
public AbstractCommandExecutor(Consumer<List<String>> logHandler,
public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskRequest taskRequest,
Logger logger) {
this.logHandler = logHandler;
this.taskRequest = taskRequest;
this.logger = logger;
this.logBuffer = Collections.synchronizedList(new ArrayList<>());
this.logBuffer = new LinkedBlockingQueue<>();
}
public AbstractCommandExecutor(List<String> logBuffer) {
public AbstractCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
this.logBuffer = logBuffer;
}
@ -290,15 +291,15 @@ public abstract class AbstractCommandExecutor {
*/
private void clear() {
List<String> markerList = new ArrayList<>();
markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>(1);
markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
if (!logBuffer.isEmpty()) {
// log handle
logHandler.accept(logBuffer);
logBuffer.clear();
}
logHandler.accept(markerList);
logHandler.accept(markerLog);
}
/**

12
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java

@ -17,11 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.api;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import org.apache.dolphinscheduler.plugin.task.util.LoggerUtils;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,12 +56,16 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
*
* @param logs log list
*/
public void logHandle(List<String> logs) {
public void logHandle(LinkedBlockingQueue<String> logs) {
// note that the "new line" is added here to facilitate log parsing
if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString());
} else {
logger.info(" -> {}", String.join("\n\t", logs));
StringJoiner joiner = new StringJoiner("\n\t");
while (!logs.isEmpty()) {
joiner.add(logs.poll());
}
logger.info(" -> {}", joiner);
}
}
}

5
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java

@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
@ -54,13 +55,13 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
* @param taskRequest taskRequest
* @param logger logger
*/
public ShellCommandExecutor(Consumer<List<String>> logHandler,
public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskRequest taskRequest,
Logger logger) {
super(logHandler, taskRequest, logger);
}
public ShellCommandExecutor(List<String> logBuffer) {
public ShellCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
super(logBuffer);
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java

@ -31,7 +31,7 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.regex.Pattern;
@ -62,7 +62,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
* @param taskRequest TaskRequest
* @param logger logger
*/
public PythonCommandExecutor(Consumer<List<String>> logHandler,
public PythonCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskRequest taskRequest,
Logger logger) {
super(logHandler, taskRequest, logger);

Loading…
Cancel
Save