diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java index 211f0a08a8..c9e4ebf434 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java @@ -19,6 +19,10 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.Constants; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -26,6 +30,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * logger utils @@ -36,6 +41,8 @@ public class LoggerUtils { throw new UnsupportedOperationException("Construct LoggerUtils"); } + private static final Logger logger = LoggerFactory.getLogger(LoggerUtils.class); + /** * rules for extracting application ID */ @@ -101,6 +108,26 @@ public class LoggerUtils { return appIds; } + /** + * read whole file content + * + * @param filePath file path + * @return whole file content + */ + public static String readWholeFileContent(String filePath) { + String line; + StringBuilder sb = new StringBuilder(); + try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)))) { + while ((line = br.readLine()) != null) { + sb.append(line + "\r\n"); + } + return sb.toString(); + } catch (IOException e) { + logger.error("read file error", e); + } + return ""; + } + public static void logError(Optional optionalLogger , String error) { optionalLogger.ifPresent((Logger logger) -> logger.error(error)); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java index 5a80e388ba..811dff5895 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java @@ -14,30 +14,72 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.utils; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Optional; + import org.junit.Assert; import org.junit.Test; +import org.junit.Test.None; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - +@RunWith(PowerMockRunner.class) +@PrepareForTest({LoggerUtils.class}) public class LoggerUtilsTest { private Logger logger = LoggerFactory.getLogger(LoggerUtilsTest.class); @Test public void buildTaskId() { - String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,79,4084,15210); + String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, 79, 4084, 15210); - Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId); + Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId); } @Test public void getAppIds() { - List appIdList = LoggerUtils.getAppIds("Running job: application_1_1",logger); - Assert.assertEquals("application_1_1", appIdList.get(0)); + List appIdList = LoggerUtils.getAppIds("Running job: application_1_1", logger); + Assert.assertEquals("application_1_1", appIdList.get(0)); + + } + + @Test + public void testReadWholeFileContent() throws Exception { + BufferedReader bufferedReader = PowerMockito.mock(BufferedReader.class); + PowerMockito.whenNew(BufferedReader.class).withAnyArguments().thenReturn(bufferedReader); + PowerMockito.when(bufferedReader.readLine()).thenReturn("").thenReturn(null); + FileInputStream fileInputStream = PowerMockito.mock(FileInputStream.class); + PowerMockito.whenNew(FileInputStream.class).withAnyArguments().thenReturn(fileInputStream); + + InputStreamReader inputStreamReader = PowerMockito.mock(InputStreamReader.class); + PowerMockito.whenNew(InputStreamReader.class).withAnyArguments().thenReturn(inputStreamReader); + + String log = LoggerUtils.readWholeFileContent("/tmp/log"); + Assert.assertNotNull(log); + + PowerMockito.when(bufferedReader.readLine()).thenThrow(new IOException()); + log = LoggerUtils.readWholeFileContent("/tmp/log"); + Assert.assertNotNull(log); + } + + @Test(expected = None.class) + public void testLogError() { + Optional loggerOptional = Optional.of(this.logger); + LoggerUtils.logError(loggerOptional, "error message"); + LoggerUtils.logError(loggerOptional, new RuntimeException("error message")); + LoggerUtils.logError(loggerOptional, "error message", new RuntimeException("runtime exception")); + LoggerUtils.logInfo(loggerOptional, "info message"); } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java index e9a85f456f..c60a4479c1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.log; import org.apache.dolphinscheduler.common.utils.IOUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; @@ -31,13 +32,11 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; @@ -86,7 +85,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { case VIEW_WHOLE_LOG_REQUEST: ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject( command.getBody(), ViewLogRequestCommand.class); - String msg = readWholeFileContent(viewLogRequest.getPath()); + String msg = LoggerUtils.readWholeFileContent(viewLogRequest.getPath()); ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg); channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque())); break; @@ -182,27 +181,4 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { return Collections.emptyList(); } - /** - * read whole file content - * - * @param filePath file path - * @return whole file content - */ - private String readWholeFileContent(String filePath) { - BufferedReader br = null; - String line; - StringBuilder sb = new StringBuilder(); - try { - br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); - while ((line = br.readLine()) != null) { - sb.append(line + "\r\n"); - } - return sb.toString(); - } catch (IOException e) { - logger.error("read file error",e); - } finally { - IOUtils.closeQuietly(br); - } - return ""; - } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java new file mode 100644 index 0000000000..e24539558c --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.log; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; + +import org.junit.Test; +import org.junit.Test.None; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import io.netty.channel.Channel; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({LoggerUtils.class}) +public class LoggerRequestProcessorTest { + + @Test(expected = None.class) + public void testProcessViewWholeLogRequest() { + Channel channel = PowerMockito.mock(Channel.class); + PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null); + PowerMockito.mockStatic(LoggerUtils.class); + PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); + + ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand("/log/path"); + + Command command = new Command(); + command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST); + command.setBody(JSONUtils.toJsonByteArray(logRequestCommand)); + + LoggerRequestProcessor loggerRequestProcessor = new LoggerRequestProcessor(); + loggerRequestProcessor.process(channel, command); + } +} \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java index 75753c78d6..5f0f5453be 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; @@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.remote.utils.IPUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,12 +118,16 @@ public class LogClientService { String result = ""; final Host address = new Host(host, port); try { - Command command = request.convert2Command(); - Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); - if (response != null) { - ViewLogResponseCommand viewLog = JSONUtils.parseObject( - response.getBody(), ViewLogResponseCommand.class); - return viewLog.getMsg(); + if (IPUtils.getLocalHost().equals(host)) { + result = LoggerUtils.readWholeFileContent(request.getPath()); + } else { + Command command = request.convert2Command(); + Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); + if (response != null) { + ViewLogResponseCommand viewLog = JSONUtils.parseObject( + response.getBody(), ViewLogResponseCommand.class); + result = viewLog.getMsg(); + } } } catch (Exception e) { logger.error("view log error", e); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java new file mode 100644 index 0000000000..58d7c1e19c --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.log; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.remote.utils.IPUtils; + +import java.nio.charset.StandardCharsets; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.Test.None; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({LogClientService.class, IPUtils.class, LoggerUtils.class, NettyRemotingClient.class}) +public class LogClientServiceTest { + + @Test + public void testViewLogFromLocal() { + String localMachine = "LOCAL_MACHINE"; + int port = 1234; + String path = "/tmp/log"; + + PowerMockito.mockStatic(IPUtils.class); + PowerMockito.when(IPUtils.getLocalHost()).thenReturn(localMachine); + PowerMockito.mockStatic(LoggerUtils.class); + PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("application_xx_11"); + + LogClientService logClientService = new LogClientService(); + String log = logClientService.viewLog(localMachine, port, path); + Assert.assertNotNull(log); + } + + @Test + public void testViewLogFromRemote() throws Exception { + String localMachine = "LOCAL_MACHINE"; + int port = 1234; + String path = "/tmp/log"; + + PowerMockito.mockStatic(IPUtils.class); + PowerMockito.when(IPUtils.getLocalHost()).thenReturn(localMachine + "1"); + + NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class); + PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient); + + Command command = new Command(); + command.setBody(JSONUtils.toJsonString(new ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8)); + PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) + .thenReturn(command); + LogClientService logClientService = new LogClientService(); + String log = logClientService.viewLog(localMachine, port, path); + Assert.assertNotNull(log); + } + + @Test(expected = None.class) + public void testClose() throws Exception { + NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class); + PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient); + PowerMockito.doNothing().when(remotingClient).close(); + + LogClientService logClientService = new LogClientService(); + logClientService.close(); + } + + @Test + public void testRollViewLog() throws Exception { + NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class); + PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient); + + Command command = new Command(); + command.setBody(JSONUtils.toJsonByteArray(new RollViewLogResponseCommand("success"))); + PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) + .thenReturn(command); + + LogClientService logClientService = new LogClientService(); + String msg = logClientService.rollViewLog("localhost", 1234, "/tmp/log", 0, 10); + Assert.assertNotNull(msg); + } + + @Test + public void testGetLogBytes() throws Exception { + NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class); + PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient); + + Command command = new Command(); + command.setBody(JSONUtils.toJsonByteArray(new GetLogBytesResponseCommand("log".getBytes(StandardCharsets.UTF_8)))); + PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) + .thenReturn(command); + + LogClientService logClientService = new LogClientService(); + byte[] logBytes = logClientService.getLogBytes("localhost", 1234, "/tmp/log"); + Assert.assertNotNull(logBytes); + } + + @Test + public void testRemoveTaskLog() throws Exception { + NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class); + PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient); + + Command command = new Command(); + command.setBody(JSONUtils.toJsonByteArray(new RemoveTaskLogResponseCommand(true))); + PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) + .thenReturn(command); + + LogClientService logClientService = new LogClientService(); + Boolean status = logClientService.removeTaskLog("localhost", 1234, "/log/path"); + Assert.assertTrue(status); + } + + @Test + public void testIsRunning() { + LogClientService logClientService = new LogClientService(); + Assert.assertTrue(logClientService.isRunning()); + } +} diff --git a/pom.xml b/pom.xml index 2ab57bcde6..525b4867ab 100644 --- a/pom.xml +++ b/pom.xml @@ -908,6 +908,7 @@ **/server/entity/SQLTaskExecutionContextTest.java **/server/log/MasterLogFilterTest.java **/server/log/SensitiveDataConverterTest.java + **/server/log/LoggerRequestProcessorTest.java **/server/log/TaskLogFilterTest.java **/server/log/WorkerLogFilterTest.java @@ -966,7 +967,7 @@ **/service/zk/RegisterOperatorTest.java **/service/queue/TaskUpdateQueueTest.java **/service/queue/PeerTaskInstancePriorityQueueTest.java - + **/service/log/LogClientServiceTest.java **/service/alert/AlertClientServiceTest.java **/dao/mapper/DataSourceUserMapperTest.java