diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 037bde6c73..73f2e700f0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -84,6 +84,8 @@ public abstract class AbstractCommandExecutor { * log list */ protected final List logBuffer; + + protected boolean logOutputIsScuccess = false; /** * SHELL result string @@ -348,34 +350,46 @@ public abstract class AbstractCommandExecutor { */ private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); - ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); - parseProcessOutputExecutorService.submit(new Runnable() { - @Override - public void run() { - BufferedReader inReader = null; + ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); + getOutputLogService.submit(() -> { + BufferedReader inReader = null; + try { + inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + String line; + logBuffer.add("welcome to use bigdata scheduling system..."); + while ((line = inReader.readLine()) != null) { + if (line.startsWith("${setValue(")) { + varPool.append(line.substring("${setValue(".length(), line.length() - 2)); + varPool.append("$VarPool$"); + } else { + logBuffer.add(line); + taskResultString = line; + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + logOutputIsScuccess = true; + close(inReader); + } + }); + getOutputLogService.shutdown(); - try { - inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); - String line; - - long lastFlushTime = System.currentTimeMillis(); - - while ((line = inReader.readLine()) != null) { - if (line.startsWith("${setValue(")) { - varPool.append(line.substring("${setValue(".length(), line.length() - 2)); - varPool.append("$VarPool$"); - } else { - logBuffer.add(line); - taskResultString = line; - lastFlushTime = flush(lastFlushTime); - } + ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); + parseProcessOutputExecutorService.submit(() -> { + try { + long lastFlushTime = System.currentTimeMillis(); + while (logBuffer.size() > 0 || !logOutputIsScuccess) { + if (logBuffer.size() > 0) { + lastFlushTime = flush(lastFlushTime); + } else { + Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL); } - } catch (Exception e) { - logger.error(e.getMessage(), e); - } finally { - clear(); - close(inReader); } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + clear(); } }); parseProcessOutputExecutorService.shutdown(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java index ed3479ee0d..ac91e1d9b8 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java @@ -16,39 +16,61 @@ */ package org.apache.dolphinscheduler.server.worker.shell; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Method; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.dolphinscheduler.common.utils.*; +import org.springframework.context.ApplicationContext; import java.util.Date; +import java.util.List; /** - * python shell command executor test + * python shell command executor test */ -@Ignore +@RunWith(PowerMockRunner.class) +@PrepareForTest(OSUtils.class) +@PowerMockIgnore({"javax.management.*"}) public class ShellCommandExecutorTest { private static final Logger logger = LoggerFactory.getLogger(ShellCommandExecutorTest.class); private ProcessService processService = null; + private ApplicationContext applicationContext; @Before - public void before(){ - processService = SpringApplicationContext.getBean(ProcessService.class); + public void before() { + applicationContext = PowerMockito.mock(ApplicationContext.class); + processService = PowerMockito.mock(ProcessService.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); } + @Ignore @Test public void test() throws Exception { @@ -63,19 +85,18 @@ public class ShellCommandExecutorTest { taskProps.setTaskInstanceId(7657); - TaskInstance taskInstance = processService.findTaskInstanceById(7657); String taskJson = taskInstance.getTaskJson(); - TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); - taskProps.setTaskParams(taskNode.getParams()); +// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class); +// taskProps.setTaskParams(taskNode.getParams()); // custom logger - Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskInstance.getProcessDefinitionId(), - taskInstance.getProcessInstanceId(), - taskInstance.getId())); +// Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, +// taskInstance.getProcessDefinitionId(), +// taskInstance.getProcessInstanceId(), +// taskInstance.getId())); // AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); @@ -91,14 +112,141 @@ public class ShellCommandExecutorTest { task.handle(); ExecutionStatus status = ExecutionStatus.SUCCESS; - if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ + if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) { status = ExecutionStatus.SUCCESS; - }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ + } else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL) { status = ExecutionStatus.KILL; - }else { + } else { status = ExecutionStatus.FAILURE; } logger.info(status.toString()); } -} \ No newline at end of file + + @Test + public void testParseProcessOutput() { + Class shellCommandExecutorClass = AbstractCommandExecutor.class; + try { + + Method method = shellCommandExecutorClass.getDeclaredMethod("parseProcessOutput", Process.class); + method.setAccessible(true); + Object[] arg1s = {new Process() { + @Override + public OutputStream getOutputStream() { + return new OutputStream() { + @Override + public void write(int b) throws IOException { + logger.info("unit test"); + } + }; + } + + @Override + public InputStream getInputStream() { + return new InputStream() { + @Override + public int read() throws IOException { + return 0; + } + }; + } + + @Override + public InputStream getErrorStream() { + return null; + } + + @Override + public int waitFor() throws InterruptedException { + return 0; + } + + @Override + public int exitValue() { + return 0; + } + + @Override + public void destroy() { + logger.info("unit test"); + } + } }; + method.invoke(new AbstractCommandExecutor(null, new TaskExecutionContext(), logger) { + @Override + protected String buildCommandFilePath() { + return null; + } + + @Override + protected String commandInterpreter() { + return null; + } + + @Override + protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { + logger.info("unit test"); + } + }, arg1s); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testFindAppId() { + Class shellCommandExecutorClass = AbstractCommandExecutor.class; + try { + + Method method = shellCommandExecutorClass.getDeclaredMethod("findAppId", new Class[]{String.class}); + method.setAccessible(true); + Object[] arg1s = {"11111"}; + String result = (String) method.invoke(new AbstractCommandExecutor(null, null, null) { + @Override + protected String buildCommandFilePath() { + return null; + } + + @Override + protected String commandInterpreter() { + return null; + } + + @Override + protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { + logger.info("unit test"); + } + }, arg1s); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testConvertFile2List() { + Class shellCommandExecutorClass = AbstractCommandExecutor.class; + try { + Method method = shellCommandExecutorClass.getDeclaredMethod("convertFile2List", String.class); + method.setAccessible(true); + Object[] arg1s = {"/opt/1.txt"}; + List result = (List) method.invoke(new AbstractCommandExecutor(null, null, null) { + @Override + protected String buildCommandFilePath() { + return null; + } + + @Override + protected String commandInterpreter() { + return null; + } + + @Override + protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { + logger.info("unit test"); + } + }, arg1s); + Assert.assertTrue(true); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java index 8c734af2ce..160e92f612 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java @@ -27,11 +27,11 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.sql.DriverManager; import java.util.Date; import java.util.HashMap; import java.util.Map; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -103,6 +103,8 @@ public class ShellTaskTest { public void testComplementData() throws Exception { shellTask = new ShellTask(taskExecutionContext, logger); shellTask.init(); + shellCommandExecutor.isSuccessOfYarnState(new ArrayList<>()); + shellCommandExecutor.isSuccessOfYarnState(null); PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); shellTask.handle(); }