diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 8dbf7220b0..cd57be9181 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -82,6 +82,8 @@ public abstract class AbstractCommandExecutor { * log list */ protected final List logBuffer; + + protected boolean logOutputIsScuccess = false; /** * taskExecutionContext @@ -336,34 +338,49 @@ public abstract class AbstractCommandExecutor { */ private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); - ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); - parseProcessOutputExecutorService.submit(new Runnable() { + ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); + getOutputLogService.submit(new Runnable() { @Override public void run() { BufferedReader inReader = null; try { inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); - String line = null; + String line; + while ((line = inReader.readLine()) != null) { + if (line.startsWith("${setValue(")) { + varPool.append(line.substring("${setValue(".length(), line.length() - 2)); + varPool.append("$VarPool$"); + } else { + logBuffer.add(line); + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + logOutputIsScuccess = true; + close(inReader); + } + } + }); + ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); + parseProcessOutputExecutorService.submit(new Runnable() { + @Override + public void run() { + try { long lastFlushTime = System.currentTimeMillis(); - while (logBuffer.size() > 0 || (line = inReader.readLine()) != null) { - if (null != line) { - if (line.startsWith("${setValue(")) { - varPool.append(line.substring("${setValue(".length(), line.length() - 2)); - varPool.append("$VarPool$"); - } else { - logBuffer.add(line); - } - line = null; + while (logBuffer.size() > 0 || !logOutputIsScuccess) { + if (logBuffer.size() > 0) { + lastFlushTime = flush(lastFlushTime); + } else { + Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL); } - lastFlushTime = flush(lastFlushTime); } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { clear(); - close(inReader); } } });