From f40e795f1012355da26389521c2099ac604a4f89 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan <861923274@qq.com> Date: Wed, 28 Jul 2021 19:58:30 +0800 Subject: [PATCH] [Improvement][Worker] Task log may be lost #5775 (#5907) --- .../dolphinscheduler/common/Constants.java | 5 ++ .../worker/task/AbstractCommandExecutor.java | 63 +++++-------------- .../server/worker/task/AbstractTask.java | 25 +++++--- .../worker/task/PythonCommandExecutor.java | 3 +- .../worker/task/ShellCommandExecutor.java | 4 +- .../server/worker/task/sql/SqlTask.java | 41 +++++++++--- .../worker/task/sqoop/SqoopTaskTest.java | 23 +++++++ 7 files changed, 97 insertions(+), 67 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index d25ddd082a..e0a96d4ac7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -388,6 +388,11 @@ public final class Constants { */ public static final int DEFAULT_LOG_FLUSH_INTERVAL = 1000; + /** + * default log buffer size in AbstractCommandExecutor + */ + public static final int DEFAULT_LOG_BUFFER_SIZE = 128; + /** * time unit secong to minutes diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 377d2c8847..1616fad487 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -41,6 +41,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -67,7 +68,7 @@ public abstract class AbstractCommandExecutor { /** * log handler */ - protected Consumer> logHandler; + protected Consumer> logHandler; /** * logger @@ -75,9 +76,9 @@ public abstract class AbstractCommandExecutor { protected Logger logger; /** - * log list + * log collection */ - protected final List logBuffer; + protected final LinkedBlockingQueue logBuffer; /** * taskExecutionContext @@ -92,13 +93,13 @@ public abstract class AbstractCommandExecutor { */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; - public AbstractCommandExecutor(Consumer> logHandler, - TaskExecutionContext taskExecutionContext , - Logger logger){ + public AbstractCommandExecutor(Consumer> logHandler, + TaskExecutionContext taskExecutionContext, + Logger logger) { this.logHandler = logHandler; this.taskExecutionContext = taskExecutionContext; this.logger = logger; - this.logBuffer = Collections.synchronizedList(new ArrayList<>()); + this.logBuffer = new LinkedBlockingQueue<>(Constants.DEFAULT_LOG_BUFFER_SIZE); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } @@ -331,15 +332,14 @@ public abstract class AbstractCommandExecutor { */ private void clear() { - List markerList = new ArrayList<>(); - markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); + LinkedBlockingQueue markerLog = new LinkedBlockingQueue<>(1); + markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); if (!logBuffer.isEmpty()) { // log handle logHandler.accept(logBuffer); - logBuffer.clear(); } - logHandler.accept(markerList); + logHandler.accept(markerLog); } /** @@ -350,9 +350,7 @@ public abstract class AbstractCommandExecutor { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); getOutputLogService.submit(() -> { - BufferedReader inReader = null; - try { - inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; while ((line = inReader.readLine()) != null) { logBuffer.add(line); @@ -361,7 +359,6 @@ public abstract class AbstractCommandExecutor { logger.error(e.getMessage(), e); } finally { logOutputIsSuccess = true; - close(inReader); } }); getOutputLogService.shutdown(); @@ -450,31 +447,20 @@ public abstract class AbstractCommandExecutor { * @return line list */ private List convertFile2List(String filename) { - List lineList = new ArrayList(100); - File file=new File(filename); + List lineList = new ArrayList<>(100); + File file = new File(filename); if (!file.exists()){ return lineList; } - BufferedReader br = null; - try { - br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8)); + try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8))) { String line = null; while ((line = br.readLine()) != null) { lineList.add(line); } } catch (Exception e) { - logger.error(String.format("read file: %s failed : ",filename),e); - } finally { - if(br != null){ - try { - br.close(); - } catch (IOException e) { - logger.error(e.getMessage(),e); - } - } - + logger.error(String.format("read file: %s failed : ", filename), e); } return lineList; } @@ -551,27 +537,10 @@ public abstract class AbstractCommandExecutor { lastFlushTime = now; /** log handle */ logHandler.accept(logBuffer); - - logBuffer.clear(); } return lastFlushTime; } - /** - * close buffer reader - * - * @param inReader in reader - */ - private void close(BufferedReader inReader) { - if (inReader != null) { - try { - inReader.close(); - } catch (IOException e) { - logger.error(e.getMessage(), e); - } - } - } - protected List commandOptions() { return Collections.emptyList(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 84c5052f29..fa2a355c4d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -42,8 +42,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.commons.lang.StringUtils; -import java.util.List; import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; @@ -58,13 +59,13 @@ public abstract class AbstractTask { TaskExecutionContext taskExecutionContext; /** - * log record + * log record */ protected Logger logger; /** - * SHELL process pid + * SHELL process pid */ protected int processId; @@ -80,14 +81,15 @@ public abstract class AbstractTask { protected volatile boolean cancel = false; /** - * exit code + * exit code */ protected volatile int exitStatusCode = -1; /** * constructor + * * @param taskExecutionContext taskExecutionContext - * @param logger logger + * @param logger logger */ protected AbstractTask(TaskExecutionContext taskExecutionContext, Logger logger) { this.taskExecutionContext = taskExecutionContext; @@ -96,6 +98,7 @@ public abstract class AbstractTask { /** * init task + * * @throws Exception exception */ public void init() throws Exception { @@ -103,13 +106,14 @@ public abstract class AbstractTask { /** * task handle + * * @throws Exception exception */ public abstract void handle() throws Exception; - /** * cancel application + * * @param status status * @throws Exception exception */ @@ -119,14 +123,19 @@ public abstract class AbstractTask { /** * log handle + * * @param logs log list */ - public void logHandle(List logs) { + public void logHandle(LinkedBlockingQueue logs) { // note that the "new line" is added here to facilitate log parsing if (logs.contains(FINALIZE_SESSION_MARKER.toString())) { logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString()); } else { - logger.info(" -> {}", String.join("\n\t", logs)); + StringJoiner joiner = new StringJoiner("\n\t"); + while (!logs.isEmpty()) { + joiner.add(logs.poll()); + } + logger.info(" -> {}", joiner); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java index 344d00fa88..2ea57106ad 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java @@ -30,6 +30,7 @@ import java.nio.file.Paths; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; /** @@ -54,7 +55,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { * @param taskExecutionContext taskExecutionContext * @param logger logger */ - public PythonCommandExecutor(Consumer> logHandler, + public PythonCommandExecutor(Consumer> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) { super(logHandler,taskExecutionContext,logger); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index 5e297abbf0..9acd52a0c0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -26,7 +26,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; /** @@ -50,7 +50,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { * @param taskExecutionContext taskExecutionContext * @param logger logger */ - public ShellCommandExecutor(Consumer> logHandler, + public ShellCommandExecutor(Consumer> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) { super(logHandler,taskExecutionContext,logger); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 2b9b9074dc..af1a5c529b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -16,9 +16,14 @@ */ package org.apache.dolphinscheduler.server.worker.task.sql; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import org.apache.commons.lang.StringUtils; +import static org.apache.dolphinscheduler.common.Constants.COMMA; +import static org.apache.dolphinscheduler.common.Constants.HIVE_CONF; +import static org.apache.dolphinscheduler.common.Constants.PASSWORD; +import static org.apache.dolphinscheduler.common.Constants.SEMICOLON; +import static org.apache.dolphinscheduler.common.Constants.STATUS; +import static org.apache.dolphinscheduler.common.Constants.USER; +import static org.apache.dolphinscheduler.common.enums.DbType.HIVE; + import org.apache.dolphinscheduler.alert.utils.MailUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -30,7 +35,11 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.sql.SqlBinds; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.task.sql.SqlType; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.EnumUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; @@ -41,16 +50,30 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.UDFUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.slf4j.Logger; -import java.sql.*; -import java.util.*; +import org.apache.commons.lang.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static org.apache.dolphinscheduler.common.Constants.*; -import static org.apache.dolphinscheduler.common.enums.DbType.HIVE; +import org.slf4j.Logger; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; /** * sql task */ diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index 5a15a21cea..96ad1d4d62 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -26,6 +26,8 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.Date; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import org.junit.Assert; import org.junit.Before; @@ -199,4 +201,25 @@ public class SqoopTaskTest { } } + @Test + public void testLogHandler() throws InterruptedException { + LinkedBlockingQueue loggerBuffer = new LinkedBlockingQueue<>(); + Thread thread1 = new Thread(() -> { + for (int i = 0; i < 10; i++) { + loggerBuffer.add("test add log"); + } + }); + Thread thread2 = new Thread(() -> { + for (int i = 0; i < 10; i++) { + sqoopTask.logHandle(loggerBuffer); + } + }); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + // if no exception throw, assert true + Assert.assertTrue(true); + } + }