diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index 0d74489f4b..5738d29555 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -379,13 +379,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst for (TaskInstance taskInstance : needToDeleteTaskInstances) { // delete log if (StringUtils.isNotEmpty(taskInstance.getLogPath())) { - try { - logClient.removeTaskLog(Host.of(taskInstance.getHost()), taskInstance.getLogPath()); - } catch (Exception e) { - logger.error( - "Remove task log error, meet an unknown exception, taskInstanceId: {}, host: {}, logPath: {}", - taskInstance.getId(), taskInstance.getHost(), taskInstance.getLogPath(), e); - } + logClient.removeTaskLog(Host.of(taskInstance.getHost()), taskInstance.getLogPath()); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java index f1f1e54a5f..114f5c1640 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java @@ -177,29 +177,38 @@ public class LogClient implements AutoCloseable { * * @param host host * @param path path - * @return remove task status */ - public Boolean removeTaskLog(@NonNull Host host, String path) { - logger.info("Remove task log from host: {} logPath {}", host, path); + public void removeTaskLog(@NonNull Host host, String path) { + logger.info("Begin remove task log from host: {} logPath {}", host, path); RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path); try { Command command = request.convert2Command(); - Command response = this.client.sendSync(host, command, LOG_REQUEST_TIMEOUT); - if (response != null) { - RemoveTaskLogResponseCommand taskLogResponse = + client.sendAsync(host, command, LOG_REQUEST_TIMEOUT, responseFuture -> { + if (responseFuture.getCause() != null) { + logger.error("Remove task log from host: {} logPath {} error, meet an unknown exception", host, + path, responseFuture.getCause()); + return; + } + Command response = responseFuture.getResponseCommand(); + if (response == null) { + logger.error("Remove task log from host: {} logPath {} error, response is null", host, path); + return; + } + RemoveTaskLogResponseCommand removeTaskLogResponse = JSONUtils.parseObject(response.getBody(), RemoveTaskLogResponseCommand.class); - return taskLogResponse.getStatus(); - } - return false; - } catch (InterruptedException ex) { + if (removeTaskLogResponse.getStatus()) { + logger.info("Success remove task log from host: {} logPath {}", host, path); + } else { + logger.error("Remove task log from host: {} logPath {} error", host, path); + } + }); + } catch (InterruptedException interruptedException) { Thread.currentThread().interrupt(); - logger.error( - "Remove task log from host: {}, logPath: {} error, the current thread has been interrupted", - host, path, ex); - return false; + logger.error("Remove task log from host: {} logPath {} error, the current thread has been interrupted", + host, + path, interruptedException); } catch (Exception e) { logger.error("Remove task log from host: {}, logPath: {} error", host, path, e); - return false; } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 359a36cde5..ce8360ad64 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -516,17 +516,10 @@ public class ProcessServiceImpl implements ProcessService { } for (TaskInstance taskInstance : taskInstanceList) { String taskLogPath = taskInstance.getLogPath(); - if (Strings.isNullOrEmpty(taskInstance.getHost())) { + if (StringUtils.isEmpty(taskInstance.getHost()) || StringUtils.isEmpty(taskLogPath)) { continue; } - try { - Host host = Host.of(taskInstance.getHost()); - logClient.removeTaskLog(host, taskLogPath); - } catch (Exception e) { - logger.error( - "Remove task log error, meet an unknown exception, taskInstanceId: {}, host: {}, logPath: {}", - taskInstance.getId(), taskInstance.getHost(), taskInstance.getLogPath(), e); - } + logClient.removeTaskLog(Host.of(taskInstance.getHost()), taskLogPath); } } @@ -2429,8 +2422,6 @@ public class ProcessServiceImpl implements ProcessService { /** * the first time (when submit the task ) get the resource of the task group - * - * @param taskId task id */ @Override public boolean acquireTaskGroup(int taskInstanceId, diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java index faaad4fd7e..3864816f67 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java @@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand; -import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand; import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand; import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.factory.NettyRemotingClientFactory; @@ -136,7 +135,7 @@ public class LogClientTest { } @Test - public void testRemoveTaskLog() throws Exception { + public void testRemoveTaskLog() { try ( MockedStatic mockedNettyRemotingClientFactory = @@ -144,15 +143,9 @@ public class LogClientTest { NettyRemotingClient remotingClient = Mockito.mock(NettyRemotingClient.class); mockedNettyRemotingClientFactory.when(NettyRemotingClientFactory::buildNettyRemotingClient) .thenReturn(remotingClient); - Command command = new Command(); - command.setBody(JSONUtils.toJsonByteArray(new RemoveTaskLogResponseCommand(true))); - Mockito.when( - remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) - .thenReturn(command); LogClient logClient = new LogClient(); - Boolean status = logClient.removeTaskLog(Host.of("localhost:1234"), "/log/path"); - Assertions.assertTrue(status); + Assertions.assertDoesNotThrow(() -> logClient.removeTaskLog(Host.of("localhost:1234"), "/log/path")); } } }