diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java index 3800264f6b..88c715fab6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java @@ -32,6 +32,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -43,6 +44,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; /** @@ -82,7 +84,6 @@ public class LoggerController extends BaseController { return loggerService.queryLog(taskInstanceId, skipNum, limit); } - /** * download log file * @@ -107,4 +108,59 @@ public class LoggerController extends BaseController { .body(logBytes); } + /** + * query task log in specified project + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstanceId task instance id + * @param skipNum skip number + * @param limit limit + * @return task log content + */ + @ApiOperation(value = "queryLogInSpecifiedProject", notes = "QUERY_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), + @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "limit", value = "LIMIT", required = true, dataType = "Int", example = "100") + }) + @GetMapping(value = "/{projectCode}/detail") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_TASK_INSTANCE_LOG_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "taskInstanceId") int taskInstanceId, + @RequestParam(value = "skipLineNum") int skipNum, + @RequestParam(value = "limit") int limit) { + return returnDataList(loggerService.queryLog(loginUser, projectCode, taskInstanceId, skipNum, limit)); + } + + /** + * download log file + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstanceId task instance id + * @return log file content + */ + @ApiOperation(value = "downloadTaskLogInSpecifiedProject", notes = "DOWNLOAD_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), + @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100") + }) + @GetMapping(value = "/{projectCode}/download-log") + @ResponseBody + @ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "taskInstanceId") int taskInstanceId) { + byte[] logBytes = loggerService.getLogBytes(loginUser, projectCode, taskInstanceId); + return ResponseEntity + .ok() + .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + System.currentTimeMillis() + ".log" + "\"") + .body(logBytes); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java index ef30a40303..b252522c7c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java @@ -18,6 +18,9 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.Map; /** * logger service @@ -43,4 +46,25 @@ public interface LoggerService { */ byte[] getLogBytes(int taskInstId); + /** + * query log + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstId task instance id + * @param skipLineNum skip line number + * @param limit limit + * @return log string data + */ + Map queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit); + + /** + * get log bytes + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstId task instance id + * @return log byte array + */ + byte[] getLogBytes(User loginUser, long projectCode, int taskInstId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index 0fd4befd77..847f87d82a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -20,10 +20,16 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.LoggerService; +import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -31,6 +37,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.lang.StringUtils; import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.Objects; import javax.annotation.PostConstruct; @@ -47,7 +54,7 @@ import com.google.common.primitives.Bytes; * logger service impl */ @Service -public class LoggerServiceImpl implements LoggerService { +public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService { private static final Logger logger = LoggerFactory.getLogger(LoggerServiceImpl.class); @@ -58,6 +65,15 @@ public class LoggerServiceImpl implements LoggerService { private LogClientService logClient; + @Autowired + ProjectMapper projectMapper; + + @Autowired + ProjectService projectService; + + @Autowired + TaskDefinitionMapper taskDefinitionMapper; + @PostConstruct public void init() { if (Objects.isNull(this.logClient)) { @@ -89,10 +105,117 @@ public class LoggerServiceImpl implements LoggerService { if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { return Result.error(Status.TASK_INSTANCE_NOT_FOUND); } + Result result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); + String log = queryLog(taskInstance,skipLineNum,limit); + result.setData(log); + return result; + } - String host = getHost(taskInstance.getHost()); - Result result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); + /** + * get log size + * + * @param taskInstId task instance id + * @return log byte array + */ + @Override + public byte[] getLogBytes(int taskInstId) { + TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); + if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { + throw new ServiceException("task instance is null or host is null"); + } + return getLogBytes(taskInstance); + } + + /** + * query log + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstId task instance id + * @param skipLineNum skip line number + * @param limit limit + * @return log string data + */ + @Override + @SuppressWarnings("unchecked") + public Map queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit) { + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + // check whether the task instance can be found + TaskInstance task = processService.findTaskInstanceById(taskInstId); + if (task == null || StringUtils.isBlank(task.getHost())) { + putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); + return result; + } + + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); + if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { + putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstId); + return result; + } + String log = queryLog(task, skipLineNum, limit); + result.put(Constants.DATA_LIST, log); + return result; + } + + /** + * get log bytes + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstId task instance id + * @return log byte array + */ + @Override + public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) { + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + throw new ServiceException("user has no permission"); + } + // check whether the task instance can be found + TaskInstance task = processService.findTaskInstanceById(taskInstId); + if (task == null || StringUtils.isBlank(task.getHost())) { + throw new ServiceException("task instance is null or host is null"); + } + + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); + if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { + throw new ServiceException("task instance does not exist in project"); + } + return getLogBytes(task); + } + + /** + * get host + * + * @param address address + * @return old version return true ,otherwise return false + */ + private String getHost(String address) { + if (Boolean.TRUE.equals(Host.isOldVersion(address))) { + return address; + } + return Host.of(address).getIp(); + } + + /** + * query log + * + * @param taskInstance task instance + * @param skipLineNum skip line number + * @param limit limit + * @return log string data + */ + private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { + + String host = getHost(taskInstance.getHost()); logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(), PropertyUtils.getInt(Constants.RPC_PORT, 50051)); @@ -109,23 +232,16 @@ public class LoggerServiceImpl implements LoggerService { log.append(logClient .rollViewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath(), skipLineNum, limit)); - result.setData(log.toString()); - return result; + return log.toString(); } - /** - * get log size + * get log bytes * - * @param taskInstId task instance id + * @param taskInstance task instance * @return log byte array */ - @Override - public byte[] getLogBytes(int taskInstId) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); - if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { - throw new ServiceException("task instance is null or host is null"); - } + private byte[] getLogBytes(TaskInstance taskInstance) { String host = getHost(taskInstance.getHost()); byte[] head = String.format(LOG_HEAD_FORMAT, taskInstance.getLogPath(), @@ -134,17 +250,4 @@ public class LoggerServiceImpl implements LoggerService { return Bytes.concat(head, logClient.getLogBytes(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath())); } - - /** - * get host - * - * @param address address - * @return old version return true ,otherwise return false - */ - private String getHost(String address) { - if (Boolean.TRUE.equals(Host.isOldVersion(address))) { - return address; - } - return Host.of(address).getIp(); - } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index bd8aa72fef..bc46d248be 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -20,9 +20,20 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -51,6 +62,15 @@ public class LoggerServiceTest { @Mock private ProcessService processService; + @Mock + private ProjectMapper projectMapper; + + @Mock + private ProjectService projectService; + + @Mock + private TaskDefinitionMapper taskDefinitionMapper; + @Before public void init() { this.loggerService.init(); @@ -113,9 +133,87 @@ public class LoggerServiceTest { } + @Test + public void testQueryLogInSpecifiedProject() { + long projectCode = 1L; + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); + Project project = getProject(projectCode); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS, projectCode); + TaskInstance taskInstance = new TaskInstance(); + Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setProjectCode(projectCode); + taskDefinition.setCode(1L); + //SUCCESS + taskInstance.setTaskCode(1L); + taskInstance.setId(1); + taskInstance.setHost("127.0.0.1:8080"); + taskInstance.setLogPath("/temp/log"); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); + Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); + result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1); + Assert.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); + } + + @Test + public void testGetLogBytesInSpecifiedProject() { + long projectCode = 1L; + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); + Project project = getProject(projectCode); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS, projectCode); + TaskInstance taskInstance = new TaskInstance(); + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setProjectCode(projectCode); + taskDefinition.setCode(1L); + //SUCCESS + taskInstance.setTaskCode(1L); + taskInstance.setId(1); + taskInstance.setHost("127.0.0.1:8080"); + taskInstance.setLogPath("/temp/log"); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); + Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); + loggerService.getLogBytes(loginUser, projectCode, 1); + } + + @After public void close() { this.loggerService.close(); } + /** + * get mock Project + * + * @param projectCode projectCode + * @return Project + */ + private Project getProject(long projectCode) { + Project project = new Project(); + project.setCode(projectCode); + project.setId(1); + project.setName("test"); + project.setUserId(1); + return project; + } + + private void putMsg(Map result, Status status, Object... statusParams) { + result.put(Constants.STATUS, status); + if (statusParams != null && statusParams.length > 0) { + result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams)); + } else { + result.put(Constants.MSG, status.getMsg()); + } + } } \ No newline at end of file