From 43f5f24529508bc36fef005c551cb0e06d0d75ca Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 5 Dec 2023 21:06:39 +0800 Subject: [PATCH] Add dolphinscheduler-extract-common module (#15266) --- .../api/service/impl/LoggerServiceImpl.java | 130 +++++------------- .../service/impl/TaskInstanceServiceImpl.java | 19 +-- .../api/service/LoggerServiceTest.java | 116 ++++++++++------ dolphinscheduler-extract/README.md | 21 +++ .../dolphinscheduler-extract-common/pom.xml | 41 ++++++ .../extract/common/ILogService.java} | 17 +-- .../common}/transportor/GetAppIdRequest.java | 4 +- .../common}/transportor/GetAppIdResponse.java | 4 +- .../TaskInstanceLogFileDownloadRequest.java | 3 +- .../TaskInstanceLogFileDownloadResponse.java | 3 +- .../TaskInstanceLogPageQueryRequest.java | 9 +- .../TaskInstanceLogPageQueryResponse.java | 2 +- .../dolphinscheduler-extract-master/pom.xml | 5 + .../extract/master/IMasterLogService.java | 38 ----- ...gicTaskInstanceLogFileDownloadRequest.java | 32 ----- ...icTaskInstanceLogFileDownloadResponse.java | 30 ---- .../LogicTaskInstanceLogPageQueryRequest.java | 35 ----- ...LogicTaskInstanceLogPageQueryResponse.java | 31 ----- .../dolphinscheduler-extract-worker/pom.xml | 5 + dolphinscheduler-extract/pom.xml | 1 + .../master/rpc/MasterLogServiceImpl.java | 41 +++--- .../master/service/WorkerFailoverService.java | 12 +- .../service/process/ProcessServiceImpl.java | 16 +-- .../worker/rpc/WorkerLogServiceImpl.java | 37 ++--- 24 files changed, 256 insertions(+), 396 deletions(-) create mode 100644 dolphinscheduler-extract/README.md create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-common/pom.xml rename dolphinscheduler-extract/{dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/IWorkerLogService.java => dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java} (76%) rename dolphinscheduler-extract/{dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker => dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common}/transportor/GetAppIdRequest.java (91%) rename dolphinscheduler-extract/{dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker => dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common}/transportor/GetAppIdResponse.java (94%) rename dolphinscheduler-extract/{dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker => dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common}/transportor/TaskInstanceLogFileDownloadRequest.java (94%) rename dolphinscheduler-extract/{dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker => dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common}/transportor/TaskInstanceLogFileDownloadResponse.java (94%) rename dolphinscheduler-extract/{dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker => dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common}/transportor/TaskInstanceLogPageQueryRequest.java (89%) rename dolphinscheduler-extract/{dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker => dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common}/transportor/TaskInstanceLogPageQueryResponse.java (94%) delete mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterLogService.java delete mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadRequest.java delete mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadResponse.java delete mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryRequest.java delete mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryResponse.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index 24dc41ce3a..e9222c715e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -26,8 +26,6 @@ import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; -import org.apache.dolphinscheduler.common.utils.LogUtils; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; @@ -37,25 +35,15 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; -import org.apache.dolphinscheduler.extract.master.IMasterLogService; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadResponse; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryResponse; -import org.apache.dolphinscheduler.extract.worker.IWorkerLogService; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadRequest; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadResponse; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryRequest; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryResponse; -import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils; +import org.apache.dolphinscheduler.extract.common.ILogService; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; -import java.io.File; -import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -108,8 +96,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService log.error("Host of task instance is null, taskInstanceId:{}.", taskInstId); return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL); } - Project project = projectMapper.queryProjectByTaskInstanceId(taskInstId); - projectService.checkProjectAndAuthThrowException(loginUser, project, VIEW_LOG); + projectService.checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); Result result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); String log = queryLog(taskInstance, skipLineNum, limit); int lineNum = log.split("\\r\\n").length; @@ -199,7 +186,6 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService */ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { final String logPath = taskInstance.getLogPath(); - final String host = taskInstance.getHost(); log.info("Query task instance log, taskInstanceId:{}, taskInstanceName:{}, host: {}, logPath:{}", taskInstance.getId(), taskInstance.getName(), taskInstance.getHost(), logPath); StringBuilder sb = new StringBuilder(); @@ -211,48 +197,24 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService sb.append(head); } - String logContent = null; - if (TaskUtils.isLogicTask(taskInstance.getTaskType())) { - IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstance.getHost(), IMasterLogService.class); - try { - LogicTaskInstanceLogPageQueryRequest logicTaskInstanceLogPageQueryRequest = - new LogicTaskInstanceLogPageQueryRequest(taskInstance.getId(), logPath, skipLineNum, limit); - LogicTaskInstanceLogPageQueryResponse logicTaskInstanceLogPageQueryResponse = - masterLogService.pageQueryLogicTaskInstanceLog(logicTaskInstanceLogPageQueryRequest); - logContent = logicTaskInstanceLogPageQueryResponse.getLogContent(); - } catch (Exception ex) { - log.error("Query LogicTaskInstance log error", ex); - } - } else { - IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(host, IWorkerLogService.class); - try { - TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest = - new TaskInstanceLogPageQueryRequest(taskInstance.getId(), logPath, skipLineNum, limit); - TaskInstanceLogPageQueryResponse taskInstanceLogPageQueryResponse = - iWorkerLogService.pageQueryTaskInstanceLog(taskInstanceLogPageQueryRequest); - logContent = taskInstanceLogPageQueryResponse.getLogContent(); - } catch (Exception ex) { - log.error("Query LogicTaskInstance log error", ex); + ILogService iLogService = + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class); + try { + TaskInstanceLogPageQueryRequest request = TaskInstanceLogPageQueryRequest.builder() + .taskInstanceId(taskInstance.getId()) + .taskInstanceLogAbsolutePath(logPath) + .skipLineNum(skipLineNum) + .limit(limit) + .build(); + TaskInstanceLogPageQueryResponse response = iLogService.pageQueryTaskInstanceLog(request); + String logContent = response.getLogContent(); + if (logContent != null) { + sb.append(logContent); } + return sb.toString(); + } catch (Throwable ex) { + throw new ServiceException(Status.QUERY_TASK_INSTANCE_LOG_ERROR, ex); } - if (logContent == null && RemoteLogUtils.isRemoteLoggingEnable()) { - // When getting the log for the first time (skipLineNum=0) returns empty, get the log from remote target - try { - log.info("Get log {} from remote target", logPath); - RemoteLogUtils.getRemoteLog(logPath); - List lines = LogUtils.readPartFileContentFromLocal(logPath, skipLineNum, limit); - logContent = LogUtils.rollViewLogLines(lines); - FileUtils.delete(new File(logPath)); - } catch (IOException e) { - log.error("Error while getting log from remote target", e); - } - } - if (logContent != null) { - sb.append(logContent); - } - return sb.toString(); } /** @@ -271,45 +233,19 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8); byte[] logBytes = new byte[0]; - if (TaskUtils.isLogicTask(taskInstance.getTaskType())) { - IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstance.getHost(), IMasterLogService.class); - try { - LogicTaskInstanceLogFileDownloadRequest logicTaskInstanceLogFileDownloadRequest = - new LogicTaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath); - LogicTaskInstanceLogFileDownloadResponse logicTaskInstanceLogFileDownloadResponse = - masterLogService.getLogicTaskInstanceWholeLogFileBytes(logicTaskInstanceLogFileDownloadRequest); - logBytes = logicTaskInstanceLogFileDownloadResponse.getLogBytes(); - } catch (Exception ex) { - log.error("Query LogicTaskInstance log error", ex); - } - } else { - IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(host, IWorkerLogService.class); - try { - TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest = - new TaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath); - TaskInstanceLogFileDownloadResponse taskInstanceWholeLogFileBytes = - iWorkerLogService.getTaskInstanceWholeLogFileBytes(taskInstanceLogFileDownloadRequest); - logBytes = taskInstanceWholeLogFileBytes.getLogBytes(); - } catch (Exception ex) { - log.error("Query LogicTaskInstance log error", ex); - } - } - if ((logBytes == null || logBytes.length == 0) && RemoteLogUtils.isRemoteLoggingEnable()) { - // get task log from remote target - try { - log.info("Get log {} from remote target", logPath); - RemoteLogUtils.getRemoteLog(logPath); - File logFile = new File(logPath); - logBytes = FileUtils.readFileToByteArray(logFile); - FileUtils.delete(logFile); - } catch (IOException e) { - log.error("Error while getting log from remote target", e); - } + ILogService iLogService = + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class); + try { + TaskInstanceLogFileDownloadRequest request = + new TaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath); + TaskInstanceLogFileDownloadResponse response = iLogService.getTaskInstanceWholeLogFileBytes(request); + logBytes = response.getLogBytes(); + return Bytes.concat(head, logBytes); + } catch (Exception ex) { + log.error("Download TaskInstance: {} Log Error", taskInstance.getName(), ex); + throw new ServiceException(Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR); } - return Bytes.concat(head, logBytes); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index 026b54e701..330374779f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -44,16 +44,14 @@ import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; -import org.apache.dolphinscheduler.extract.master.IMasterLogService; +import org.apache.dolphinscheduler.extract.common.ILogService; import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; -import org.apache.dolphinscheduler.extract.worker.IWorkerLogService; import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest; import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse; import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest; import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.lang3.StringUtils; @@ -381,18 +379,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst return; } for (TaskInstance taskInstance : needToDeleteTaskInstances) { - // delete log - if (StringUtils.isNotEmpty(taskInstance.getLogPath())) { - if (TaskUtils.isLogicTask(taskInstance.getTaskType())) { - IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstance.getHost(), IMasterLogService.class); - masterLogService.removeLogicTaskInstanceLog(taskInstance.getLogPath()); - } else { - IWorkerLogService workerLogService = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstance.getHost(), IWorkerLogService.class); - workerLogService.removeTaskInstanceLog(taskInstance.getLogPath()); - } - } + ILogService iLogService = + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class); + iLogService.removeTaskInstanceLog(taskInstance.getLogPath()); } dqExecuteResultDao.deleteByWorkflowInstanceId(workflowInstanceId); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index cd95511130..1eb7a2575b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -21,8 +21,10 @@ import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThro import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; +import org.apache.dolphinscheduler.api.AssertionsHelper; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; @@ -36,26 +38,34 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.extract.base.NettyRemotingServer; +import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; +import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; +import org.apache.dolphinscheduler.extract.common.ILogService; +import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest; +import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * logger service test - */ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class LoggerServiceTest { @@ -77,6 +87,45 @@ public class LoggerServiceTest { @Mock private TaskDefinitionMapper taskDefinitionMapper; + private NettyRemotingServer nettyRemotingServer; + + @BeforeEach + public void setUp() { + nettyRemotingServer = new NettyRemotingServer(new NettyServerConfig(8080)); + nettyRemotingServer.start(); + SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery = + new SpringServerMethodInvokerDiscovery(nettyRemotingServer); + springServerMethodInvokerDiscovery.postProcessAfterInitialization(new ILogService() { + + @Override + public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest) { + return new TaskInstanceLogFileDownloadResponse(new byte[0]); + } + + @Override + public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest) { + return new TaskInstanceLogPageQueryResponse(); + } + + @Override + public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) { + return new GetAppIdResponse(); + } + + @Override + public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) { + + } + }, "iLogServiceImpl"); + } + + @AfterEach + public void tearDown() { + if (nettyRemotingServer != null) { + nettyRemotingServer.close(); + } + } + @Test public void testQueryLog() { @@ -101,27 +150,20 @@ public class LoggerServiceTest { // PROJECT_NOT_EXIST taskInstance.setHost("127.0.0.1:8080"); taskInstance.setLogPath("/temp/log"); - Project project = getProject(1); - Mockito.when(projectMapper.queryProjectByTaskInstanceId(1)).thenReturn(project); - try { - Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) - .checkProjectAndAuthThrowException(loginUser, project, VIEW_LOG); - loggerService.queryLog(loginUser, 1, 1, 1); - } catch (ServiceException serviceException) { - Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode()); - } + doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) + .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); + AssertionsHelper.assertThrowsServiceException(Status.PROJECT_NOT_EXIST, + () -> loggerService.queryLog(loginUser, 1, 1, 1)); // USER_NO_OPERATION_PERM - try { - Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService) - .checkProjectAndAuthThrowException(loginUser, project, VIEW_LOG); - loggerService.queryLog(loginUser, 1, 1, 1); - } catch (ServiceException serviceException) { - Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), serviceException.getCode()); - } + doThrow(new ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService) + .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); + AssertionsHelper.assertThrowsServiceException(Status.USER_NO_OPERATION_PERM, + () -> loggerService.queryLog(loginUser, 1, 1, 1)); // SUCCESS - doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project, VIEW_LOG); + doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), + VIEW_LOG); when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); result = loggerService.queryLog(loginUser, 1, 1, 1); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); @@ -158,30 +200,22 @@ public class LoggerServiceTest { // PROJECT_NOT_EXIST taskInstance.setHost("127.0.0.1:8080"); taskInstance.setLogPath("/temp/log"); - try { - Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) - .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), DOWNLOAD_LOG); - loggerService.queryLog(loginUser, 1, 1, 1); - } catch (ServiceException serviceException) { - Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode()); - } + doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) + .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); + AssertionsHelper.assertThrowsServiceException(Status.PROJECT_NOT_EXIST, + () -> loggerService.queryLog(loginUser, 1, 1, 1)); // USER_NO_OPERATION_PERM - Project project = getProject(1); - when(projectMapper.queryProjectByTaskInstanceId(1)).thenReturn(project); - try { - Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService) - .checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); - loggerService.queryLog(loginUser, 1, 1, 1); - } catch (ServiceException serviceException) { - Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), serviceException.getCode()); - } + doThrow(new ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService) + .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); + AssertionsHelper.assertThrowsServiceException(Status.USER_NO_OPERATION_PERM, + () -> loggerService.queryLog(loginUser, 1, 1, 1)); // SUCCESS - doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); - when(projectMapper.queryProjectByTaskInstanceId(1)).thenReturn(project); - byte[] result = loggerService.getLogBytes(loginUser, 1); - Assertions.assertEquals(47, result.length); + doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), + DOWNLOAD_LOG); + byte[] logBytes = loggerService.getLogBytes(loginUser, 1); + Assertions.assertEquals(47, logBytes.length); } @Test diff --git a/dolphinscheduler-extract/README.md b/dolphinscheduler-extract/README.md new file mode 100644 index 0000000000..0bf173d560 --- /dev/null +++ b/dolphinscheduler-extract/README.md @@ -0,0 +1,21 @@ +# Introduction + +This module contains the RPC interface which can be used to communicate with the DolphinScheduler server. + +# [dolphinscheduler-extract-base](dolphinscheduler-extract-base) + +The base module contains the basic interfaces for how to define the RPC client and server. + +# [dolphinscheduler-extract-common](dolphinscheduler-extract-common) + +The common module contains the common interface which can be used by both the master and worker. + +# [dolphinscheduler-extract-master](dolphinscheduler-extract-master) + +This module contains the RPC interface which can be used by communicate with the master server. + +# [dolphinscheduler-extract-worker](dolphinscheduler-extract-worker) +This module contains the RPC interface which can be used by communicate with the worker server. + +# [dolphinscheduler-extract-alert](dolphinscheduler-extract-alert) +This module contains the RPC interface which can be used by communicate with the alert server. diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-common/pom.xml b/dolphinscheduler-extract/dolphinscheduler-extract-common/pom.xml new file mode 100644 index 0000000000..d7d339ff9f --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/pom.xml @@ -0,0 +1,41 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-extract + dev-SNAPSHOT + + + org.apache.dolphinscheduler + dolphinscheduler-extract-common + + + + org.apache.dolphinscheduler + dolphinscheduler-extract-base + ${project.version} + + + + diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/IWorkerLogService.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java similarity index 76% rename from dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/IWorkerLogService.java rename to dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java index d03191888a..c602f15bb2 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/IWorkerLogService.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java @@ -15,19 +15,19 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.worker; +package org.apache.dolphinscheduler.extract.common; import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcService; -import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdRequest; -import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdResponse; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadRequest; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadResponse; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryRequest; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryResponse; +import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest; +import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; @RpcService -public interface IWorkerLogService { +public interface ILogService { @RpcMethod TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest); @@ -40,4 +40,5 @@ public interface IWorkerLogService { @RpcMethod void removeTaskInstanceLog(String taskInstanceLogAbsolutePath); + } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdRequest.java similarity index 91% rename from dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdRequest.java rename to dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdRequest.java index 1148a6f24c..d6f0077b63 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdRequest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdRequest.java @@ -15,13 +15,15 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.worker.transportor; +package org.apache.dolphinscheduler.extract.common.transportor; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data +@Builder @NoArgsConstructor @AllArgsConstructor public class GetAppIdRequest { diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdResponse.java similarity index 94% rename from dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdResponse.java rename to dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdResponse.java index 3b95a4d66e..db3fc2af89 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdResponse.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdResponse.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.worker.transportor; +package org.apache.dolphinscheduler.extract.common.transportor; import java.util.List; @@ -24,8 +24,8 @@ import lombok.Data; import lombok.NoArgsConstructor; @Data -@NoArgsConstructor @AllArgsConstructor +@NoArgsConstructor public class GetAppIdResponse { private List appIds; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadRequest.java similarity index 94% rename from dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadRequest.java rename to dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadRequest.java index 1fd3cf2167..dcb8c1da4a 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadRequest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadRequest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.worker.transportor; +package org.apache.dolphinscheduler.extract.common.transportor; import lombok.AllArgsConstructor; import lombok.Data; @@ -29,4 +29,5 @@ public class TaskInstanceLogFileDownloadRequest { private long taskInstanceId; private String taskInstanceLogAbsolutePath; + } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadResponse.java similarity index 94% rename from dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadResponse.java rename to dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadResponse.java index ee805a35b5..ac1e5b0cfd 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadResponse.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadResponse.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.worker.transportor; +package org.apache.dolphinscheduler.extract.common.transportor; import lombok.AllArgsConstructor; import lombok.Data; @@ -27,4 +27,5 @@ import lombok.NoArgsConstructor; public class TaskInstanceLogFileDownloadResponse { private byte[] logBytes; + } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryRequest.java similarity index 89% rename from dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryRequest.java rename to dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryRequest.java index 321353e5d3..2723ac5baa 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryRequest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryRequest.java @@ -15,21 +15,24 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.worker.transportor; +package org.apache.dolphinscheduler.extract.common.transportor; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data -@NoArgsConstructor +@Builder @AllArgsConstructor +@NoArgsConstructor public class TaskInstanceLogPageQueryRequest { - private int taskInstanceId; + private Integer taskInstanceId; private String taskInstanceLogAbsolutePath; private int skipLineNum; private int limit; + } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryResponse.java similarity index 94% rename from dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryResponse.java rename to dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryResponse.java index 1e0d481ddd..ac769d7677 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryResponse.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryResponse.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.worker.transportor; +package org.apache.dolphinscheduler.extract.common.transportor; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml b/dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml index 04afc0b1de..955bcd0a12 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml @@ -30,6 +30,11 @@ dolphinscheduler-extract-master + + org.apache.dolphinscheduler + dolphinscheduler-extract-common + ${project.version} + org.apache.dolphinscheduler dolphinscheduler-extract-base diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterLogService.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterLogService.java deleted file mode 100644 index a7f15b2ea8..0000000000 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterLogService.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.extract.master; - -import org.apache.dolphinscheduler.extract.base.RpcMethod; -import org.apache.dolphinscheduler.extract.base.RpcService; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadResponse; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryResponse; - -@RpcService -public interface IMasterLogService { - - @RpcMethod - LogicTaskInstanceLogFileDownloadResponse getLogicTaskInstanceWholeLogFileBytes(LogicTaskInstanceLogFileDownloadRequest logicTaskInstanceLogFileDownloadRequest); - - @RpcMethod - LogicTaskInstanceLogPageQueryResponse pageQueryLogicTaskInstanceLog(LogicTaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest); - - @RpcMethod - void removeLogicTaskInstanceLog(String taskInstanceLogAbsolutePath); -} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadRequest.java deleted file mode 100644 index 7fe4c5df24..0000000000 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadRequest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.extract.master.transportor; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@AllArgsConstructor -public class LogicTaskInstanceLogFileDownloadRequest { - - private long taskInstanceId; - - private String taskInstanceLogAbsolutePath; -} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadResponse.java deleted file mode 100644 index fe80526e55..0000000000 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadResponse.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.extract.master.transportor; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@AllArgsConstructor -public class LogicTaskInstanceLogFileDownloadResponse { - - private byte[] logBytes; -} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryRequest.java deleted file mode 100644 index 5a8d3da808..0000000000 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryRequest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.extract.master.transportor; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@AllArgsConstructor -public class LogicTaskInstanceLogPageQueryRequest { - - private long taskInstanceId; - - private String taskInstanceLogAbsolutePath; - - private int skipLineNum; - private int limit; -} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryResponse.java deleted file mode 100644 index 1d4a2b6b55..0000000000 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryResponse.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.extract.master.transportor; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@AllArgsConstructor -public class LogicTaskInstanceLogPageQueryResponse { - - private String logContent; - -} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml b/dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml index da5fbd3b3f..0a42b73a1d 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml +++ b/dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml @@ -30,6 +30,11 @@ dolphinscheduler-extract-worker + + org.apache.dolphinscheduler + dolphinscheduler-extract-common + ${project.version} + org.apache.dolphinscheduler dolphinscheduler-extract-base diff --git a/dolphinscheduler-extract/pom.xml b/dolphinscheduler-extract/pom.xml index dca4fd6623..09be31ebcc 100644 --- a/dolphinscheduler-extract/pom.xml +++ b/dolphinscheduler-extract/pom.xml @@ -34,6 +34,7 @@ dolphinscheduler-extract-master dolphinscheduler-extract-worker dolphinscheduler-extract-alert + dolphinscheduler-extract-common diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterLogServiceImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterLogServiceImpl.java index 6d908d16ac..0fa7cbffd9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterLogServiceImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterLogServiceImpl.java @@ -17,14 +17,17 @@ package org.apache.dolphinscheduler.server.master.rpc; +import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.LogUtils; -import org.apache.dolphinscheduler.extract.master.IMasterLogService; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadResponse; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryResponse; +import org.apache.dolphinscheduler.extract.common.ILogService; +import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest; +import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; -import java.io.File; +import java.util.Collections; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -33,18 +36,18 @@ import org.springframework.stereotype.Service; @Slf4j @Service -public class MasterLogServiceImpl implements IMasterLogService { +public class MasterLogServiceImpl implements ILogService { @Override - public LogicTaskInstanceLogFileDownloadResponse getLogicTaskInstanceWholeLogFileBytes(LogicTaskInstanceLogFileDownloadRequest logicTaskInstanceLogFileDownloadRequest) { + public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest logicTaskInstanceLogFileDownloadRequest) { byte[] bytes = LogUtils.getFileContentBytes(logicTaskInstanceLogFileDownloadRequest.getTaskInstanceLogAbsolutePath()); // todo: if file not exists, return error result - return new LogicTaskInstanceLogFileDownloadResponse(bytes); + return new TaskInstanceLogFileDownloadResponse(bytes); } @Override - public LogicTaskInstanceLogPageQueryResponse pageQueryLogicTaskInstanceLog(LogicTaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest) { + public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest) { List lines = LogUtils.readPartFileContent( taskInstanceLogPageQueryRequest.getTaskInstanceLogAbsolutePath(), @@ -52,18 +55,16 @@ public class MasterLogServiceImpl implements IMasterLogService { taskInstanceLogPageQueryRequest.getLimit()); String logContent = LogUtils.rollViewLogLines(lines); - return new LogicTaskInstanceLogPageQueryResponse(logContent); + return new TaskInstanceLogPageQueryResponse(logContent); } @Override - public void removeLogicTaskInstanceLog(String taskInstanceLogAbsolutePath) { - File taskLogFile = new File(taskInstanceLogAbsolutePath); - try { - if (taskLogFile.exists()) { - taskLogFile.delete(); - } - } catch (Exception e) { - log.error("Remove LogicTaskInstanceLog error", e); - } + public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) { + return new GetAppIdResponse(Collections.emptyList()); + } + + @Override + public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) { + FileUtils.deleteFile(taskInstanceLogAbsolutePath); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index 0dc1b16679..86515d032d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -25,9 +25,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; -import org.apache.dolphinscheduler.extract.worker.IWorkerLogService; -import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdRequest; -import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdResponse; +import org.apache.dolphinscheduler.extract.common.ILogService; +import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest; +import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; @@ -274,10 +274,10 @@ public class WorkerFailoverService { .create(); // only kill yarn/k8s job if exists , the local thread has exited log.info("TaskInstance failover begin kill the task related yarn or k8s job"); - IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstance.getHost(), IWorkerLogService.class); + ILogService iLogService = + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class); GetAppIdResponse getAppIdResponse = - iWorkerLogService.getAppId(new GetAppIdRequest(taskInstance.getId(), taskInstance.getLogPath())); + iLogService.getAppId(new GetAppIdRequest(taskInstance.getId(), taskInstance.getLogPath())); ProcessUtils.killApplication(getAppIdResponse.getAppIds(), taskExecutionContext); } catch (Exception ex) { log.error("Kill yarn task error", ex); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 760ed1429a..d065d62f53 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -116,10 +116,9 @@ import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.DqRuleUtils; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; -import org.apache.dolphinscheduler.extract.master.IMasterLogService; +import org.apache.dolphinscheduler.extract.common.ILogService; import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener; import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent; -import org.apache.dolphinscheduler.extract.worker.IWorkerLogService; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -130,7 +129,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter; -import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; @@ -516,15 +514,9 @@ public class ProcessServiceImpl implements ProcessService { if (StringUtils.isEmpty(taskInstance.getHost()) || StringUtils.isEmpty(taskLogPath)) { continue; } - if (TaskUtils.isLogicTask(taskInstance.getTaskType())) { - IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstance.getHost(), IMasterLogService.class); - masterLogService.removeLogicTaskInstanceLog(taskLogPath); - } else { - IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstance.getHost(), IWorkerLogService.class); - iWorkerLogService.removeTaskInstanceLog(taskLogPath); - } + ILogService iLogService = + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class); + iLogService.removeTaskInstanceLog(taskLogPath); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java index 405d79a0fb..24b5477cce 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java @@ -20,19 +20,19 @@ package org.apache.dolphinscheduler.server.worker.rpc; import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT; import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.LogUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import org.apache.dolphinscheduler.extract.worker.IWorkerLogService; -import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdRequest; -import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdResponse; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadRequest; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadResponse; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryRequest; -import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryResponse; +import org.apache.dolphinscheduler.extract.common.ILogService; +import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest; +import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; +import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import java.io.File; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -41,11 +41,11 @@ import org.springframework.stereotype.Service; @Slf4j @Service -public class WorkerLogServiceImpl implements IWorkerLogService { +public class WorkerLogServiceImpl implements ILogService { @Override public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest) { - byte[] bytes = org.apache.dolphinscheduler.common.utils.LogUtils + byte[] bytes = LogUtils .getFileContentBytes(taskInstanceLogFileDownloadRequest.getTaskInstanceLogAbsolutePath()); // todo: if file not exists, return error result return new TaskInstanceLogFileDownloadResponse(bytes); @@ -53,12 +53,12 @@ public class WorkerLogServiceImpl implements IWorkerLogService { @Override public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest) { - List lines = org.apache.dolphinscheduler.common.utils.LogUtils.readPartFileContent( + List lines = LogUtils.readPartFileContent( taskInstanceLogPageQueryRequest.getTaskInstanceLogAbsolutePath(), taskInstanceLogPageQueryRequest.getSkipLineNum(), taskInstanceLogPageQueryRequest.getLimit()); - String logContent = org.apache.dolphinscheduler.common.utils.LogUtils.rollViewLogLines(lines); + String logContent = LogUtils.rollViewLogLines(lines); return new TaskInstanceLogPageQueryResponse(logContent); } @@ -68,20 +68,13 @@ public class WorkerLogServiceImpl implements IWorkerLogService { TaskExecutionContextCacheManager.getByTaskInstanceId(getAppIdRequest.getTaskInstanceId()); String appInfoPath = taskExecutionContext.getAppInfoPath(); String logPath = getAppIdRequest.getLogPath(); - List appIds = LogUtils.getAppIds(logPath, appInfoPath, + List appIds = org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils.getAppIds(logPath, appInfoPath, PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY)); return new GetAppIdResponse(appIds); } @Override public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) { - File taskLogFile = new File(taskInstanceLogAbsolutePath); - try { - if (taskLogFile.exists()) { - taskLogFile.delete(); - } - } catch (Exception e) { - log.error("Remove TaskInstanceLog error", e); - } + FileUtils.deleteFile(taskInstanceLogAbsolutePath); } }