Browse Source

[Feature][dolphinscheduler-api] add queryLog and downloadTaskLog with projectCode #7153 (#7192)

* [Feature][dolphinscheduler-api] add queryLog and downloadTaskLog with projectCode #7153

* [Feature][dolphinscheduler-api] add queryLog and downloadTaskLog with projectCode #7153

* [Feature][dolphinscheduler-api] add queryLog and downloadTaskLog with projectCode #7153

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
3.0.0/version-upgrade
zwZjut 3 years ago committed by GitHub
parent
commit
05aef279c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 58
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
  2. 24
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
  3. 157
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
  4. 98
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java

58
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java

@ -32,6 +32,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
@ -43,6 +44,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore; import springfox.documentation.annotations.ApiIgnore;
/** /**
@ -82,7 +84,6 @@ public class LoggerController extends BaseController {
return loggerService.queryLog(taskInstanceId, skipNum, limit); return loggerService.queryLog(taskInstanceId, skipNum, limit);
} }
/** /**
* download log file * download log file
* *
@ -107,4 +108,59 @@ public class LoggerController extends BaseController {
.body(logBytes); .body(logBytes);
} }
/**
* query task log in specified project
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstanceId task instance id
* @param skipNum skip number
* @param limit limit
* @return task log content
*/
@ApiOperation(value = "queryLogInSpecifiedProject", notes = "QUERY_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "limit", value = "LIMIT", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/{projectCode}/detail")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_INSTANCE_LOG_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return returnDataList(loggerService.queryLog(loginUser, projectCode, taskInstanceId, skipNum, limit));
}
/**
* download log file
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstanceId task instance id
* @return log file content
*/
@ApiOperation(value = "downloadTaskLogInSpecifiedProject", notes = "DOWNLOAD_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/{projectCode}/download-log")
@ResponseBody
@ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskInstanceId") int taskInstanceId) {
byte[] logBytes = loggerService.getLogBytes(loginUser, projectCode, taskInstanceId);
return ResponseEntity
.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + System.currentTimeMillis() + ".log" + "\"")
.body(logBytes);
}
} }

24
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java

@ -18,6 +18,9 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map;
/** /**
* logger service * logger service
@ -43,4 +46,25 @@ public interface LoggerService {
*/ */
byte[] getLogBytes(int taskInstId); byte[] getLogBytes(int taskInstId);
/**
* query log
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
Map<String, Object> queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit);
/**
* get log bytes
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @return log byte array
*/
byte[] getLogBytes(User loginUser, long projectCode, int taskInstId);
} }

157
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java

@ -20,10 +20,16 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -31,6 +37,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -47,7 +54,7 @@ import com.google.common.primitives.Bytes;
* logger service impl * logger service impl
*/ */
@Service @Service
public class LoggerServiceImpl implements LoggerService { public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService {
private static final Logger logger = LoggerFactory.getLogger(LoggerServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(LoggerServiceImpl.class);
@ -58,6 +65,15 @@ public class LoggerServiceImpl implements LoggerService {
private LogClientService logClient; private LogClientService logClient;
@Autowired
ProjectMapper projectMapper;
@Autowired
ProjectService projectService;
@Autowired
TaskDefinitionMapper taskDefinitionMapper;
@PostConstruct @PostConstruct
public void init() { public void init() {
if (Objects.isNull(this.logClient)) { if (Objects.isNull(this.logClient)) {
@ -89,10 +105,117 @@ public class LoggerServiceImpl implements LoggerService {
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
return Result.error(Status.TASK_INSTANCE_NOT_FOUND); return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
} }
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
String log = queryLog(taskInstance,skipLineNum,limit);
result.setData(log);
return result;
}
String host = getHost(taskInstance.getHost());
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); /**
* get log size
*
* @param taskInstId task instance id
* @return log byte array
*/
@Override
public byte[] getLogBytes(int taskInstId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
return getLogBytes(taskInstance);
}
/**
* query log
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
// check whether the task instance can be found
TaskInstance task = processService.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstId);
return result;
}
String log = queryLog(task, skipLineNum, limit);
result.put(Constants.DATA_LIST, log);
return result;
}
/**
* get log bytes
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @return log byte array
*/
@Override
public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
throw new ServiceException("user has no permission");
}
// check whether the task instance can be found
TaskInstance task = processService.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
throw new ServiceException("task instance does not exist in project");
}
return getLogBytes(task);
}
/**
* get host
*
* @param address address
* @return old version return true ,otherwise return false
*/
private String getHost(String address) {
if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
return address;
}
return Host.of(address).getIp();
}
/**
* query log
*
* @param taskInstance task instance
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
String host = getHost(taskInstance.getHost());
logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(), logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(),
PropertyUtils.getInt(Constants.RPC_PORT, 50051)); PropertyUtils.getInt(Constants.RPC_PORT, 50051));
@ -109,23 +232,16 @@ public class LoggerServiceImpl implements LoggerService {
log.append(logClient log.append(logClient
.rollViewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath(), skipLineNum, limit)); .rollViewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath(), skipLineNum, limit));
result.setData(log.toString()); return log.toString();
return result;
} }
/** /**
* get log size * get log bytes
* *
* @param taskInstId task instance id * @param taskInstance task instance
* @return log byte array * @return log byte array
*/ */
@Override private byte[] getLogBytes(TaskInstance taskInstance) {
public byte[] getLogBytes(int taskInstId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
String host = getHost(taskInstance.getHost()); String host = getHost(taskInstance.getHost());
byte[] head = String.format(LOG_HEAD_FORMAT, byte[] head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(), taskInstance.getLogPath(),
@ -134,17 +250,4 @@ public class LoggerServiceImpl implements LoggerService {
return Bytes.concat(head, return Bytes.concat(head,
logClient.getLogBytes(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath())); logClient.getLogBytes(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath()));
} }
/**
* get host
*
* @param address address
* @return old version return true ,otherwise return false
*/
private String getHost(String address) {
if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
return address;
}
return Host.of(address).getIp();
}
} }

98
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java

@ -20,9 +20,20 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -51,6 +62,15 @@ public class LoggerServiceTest {
@Mock @Mock
private ProcessService processService; private ProcessService processService;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectService projectService;
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Before @Before
public void init() { public void init() {
this.loggerService.init(); this.loggerService.init();
@ -113,9 +133,87 @@ public class LoggerServiceTest {
} }
@Test
public void testQueryLogInSpecifiedProject() {
long projectCode = 1L;
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Project project = getProject(projectCode);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L);
//SUCCESS
taskInstance.setTaskCode(1L);
taskInstance.setId(1);
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
Assert.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
}
@Test
public void testGetLogBytesInSpecifiedProject() {
long projectCode = 1L;
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Project project = getProject(projectCode);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
TaskInstance taskInstance = new TaskInstance();
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L);
//SUCCESS
taskInstance.setTaskCode(1L);
taskInstance.setId(1);
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
loggerService.getLogBytes(loginUser, projectCode, 1);
}
@After @After
public void close() { public void close() {
this.loggerService.close(); this.loggerService.close();
} }
/**
* get mock Project
*
* @param projectCode projectCode
* @return Project
*/
private Project getProject(long projectCode) {
Project project = new Project();
project.setCode(projectCode);
project.setId(1);
project.setName("test");
project.setUserId(1);
return project;
}
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {
result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
} else {
result.put(Constants.MSG, status.getMsg());
}
}
} }
Loading…
Cancel
Save