From dc7a0a0f0e3a36fdda7a568395c8f401034392a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=B8=85?= <62982788+597365581@users.noreply.github.com> Date: Thu, 18 Feb 2021 10:54:01 +0800 Subject: [PATCH] Update AbstractCommandExecutor.java --- .../worker/task/AbstractCommandExecutor.java | 60 ++++++++----------- 1 file changed, 25 insertions(+), 35 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 5b09be85ae..2ef9884e6d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -339,50 +339,40 @@ public abstract class AbstractCommandExecutor { private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); - getOutputLogService.submit(new Runnable() { - @Override - public void run() { - BufferedReader inReader = null; + getOutputLogService.submit(() -> { + BufferedReader inReader = null; - try { - inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); - String line; - while ((line = inReader.readLine()) != null) { - if (line.startsWith("${setValue(")) { - varPool.append(line.substring("${setValue(".length(), line.length() - 2)); - varPool.append("$VarPool$"); - } else { - logBuffer.add(line); - } - } - } catch (Exception e) { - logger.error(e.getMessage(), e); - } finally { - logOutputIsScuccess = true; - close(inReader); + try { + inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + String line; + logBuffer.add("welcome to use bigdata scheduling system..."); + while ((line = inReader.readLine()) != null) { + logBuffer.add(line); } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + logOutputIsScuccess = true; + close(inReader); } }); getOutputLogService.shutdown(); ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); - parseProcessOutputExecutorService.submit(new Runnable() { - @Override - public void run() { - try { - long lastFlushTime = System.currentTimeMillis(); - while (logBuffer.size() > 0 || !logOutputIsScuccess) { - if (logBuffer.size() > 0) { - lastFlushTime = flush(lastFlushTime); - } else { - Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL); - } + parseProcessOutputExecutorService.submit(() -> { + try { + long lastFlushTime = System.currentTimeMillis(); + while (logBuffer.size() > 0 || !logOutputIsScuccess) { + if (logBuffer.size() > 0) { + lastFlushTime = flush(lastFlushTime); + } else { + Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL); } - } catch (Exception e) { - logger.error(e.getMessage(), e); - } finally { - clear(); } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + clear(); } }); parseProcessOutputExecutorService.shutdown();