Browse Source

[Improvement-3369][api] Introduce logger service interface for clear code (#3437)

* [Improvement][api] Introduce logger service interface for clear code

* Remove the code smell
pull/3/MERGE
Yichao Yang 4 years ago committed by GitHub
parent
commit
dce403d062
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
  2. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandler.java
  3. 93
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
  4. 146
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
  5. 44
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
  6. 60
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  7. 33
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java

@ -17,25 +17,34 @@
package org.apache.dolphinscheduler.api.controller; package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_INSTANCE_LOG_ERROR;
import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.GetMapping;
import springfox.documentation.annotations.ApiIgnore; import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import static org.apache.dolphinscheduler.api.enums.Status.*; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import springfox.documentation.annotations.ApiIgnore;
/** /**
@ -70,7 +79,7 @@ public class LoggerController extends BaseController {
@GetMapping(value = "/detail") @GetMapping(value = "/detail")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_INSTANCE_LOG_ERROR) @ApiException(QUERY_TASK_INSTANCE_LOG_ERROR)
public Result queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskInstanceId") int taskInstanceId, @RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum, @RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) { @RequestParam(value = "limit") int limit) {

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandler.java

@ -18,17 +18,18 @@ package org.apache.dolphinscheduler.api.exceptions;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.method.HandlerMethod; import org.springframework.web.method.HandlerMethod;
/** /**
* Exception Handler * Exception Handler
*/ */
@ControllerAdvice @RestControllerAdvice
@ResponseBody @ResponseBody
public class ApiExceptionHandler { public class ApiExceptionHandler {

93
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java

@ -16,45 +16,12 @@
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import java.nio.charset.StandardCharsets;
import javax.annotation.PreDestroy;
import org.apache.commons.lang.ArrayUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/** /**
* log service * log service
*/ */
@Service public interface LoggerService {
public class LoggerService {
private static final Logger logger = LoggerFactory.getLogger(LoggerService.class);
private static final String LOG_HEAD_FORMAT = "[LOG-PATH]: %s, [HOST]: %s%s";
@Autowired
private ProcessService processService;
private final LogClientService logClient;
public LoggerService() {
logClient = new LogClientService();
}
@PreDestroy
public void close() {
logClient.close();
}
/** /**
* view log * view log
@ -64,36 +31,7 @@ public class LoggerService {
* @param limit limit * @param limit limit
* @return log string data * @return log string data
*/ */
public Result queryLog(int taskInstId, int skipLineNum, int limit) { Result<String> queryLog(int taskInstId, int skipLineNum, int limit);
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
}
String host = getHost(taskInstance.getHost());
Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(),
Constants.RPC_PORT);
StringBuilder log = new StringBuilder();
if (skipLineNum == 0) {
String head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
host,
Constants.SYSTEM_LINE_SEPARATOR);
log.append(head);
}
log.append(logClient
.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(), skipLineNum, limit));
result.setData(log);
return result;
}
/** /**
@ -102,31 +40,6 @@ public class LoggerService {
* @param taskInstId task instance id * @param taskInstId task instance id
* @return log byte array * @return log byte array
*/ */
public byte[] getLogBytes(int taskInstId) { byte[] getLogBytes(int taskInstId);
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new RuntimeException("task instance is null or host is null");
}
String host = getHost(taskInstance.getHost());
byte[] head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
host,
Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8);
return ArrayUtils.addAll(head,
logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath()));
}
/**
* get host
*
* @param address address
* @return old version return true ,otherwise return false
*/
private String getHost(String address) {
if (Host.isOldVersion(address)) {
return address;
}
return Host.of(address).getIp();
}
} }

146
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.ArrayUtils;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* log service
*/
@Service
public class LoggerServiceImpl implements LoggerService {
private static final Logger logger = LoggerFactory.getLogger(LoggerServiceImpl.class);
private static final String LOG_HEAD_FORMAT = "[LOG-PATH]: %s, [HOST]: %s%s";
@Autowired
private ProcessService processService;
private LogClientService logClient;
@PostConstruct
public void init() {
if (Objects.isNull(this.logClient)) {
this.logClient = new LogClientService();
}
}
@PreDestroy
public void close() {
if (Objects.nonNull(this.logClient) && this.logClient.isRunning()) {
logClient.close();
}
}
/**
* view log
*
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
@SuppressWarnings("unchecked")
public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
}
String host = getHost(taskInstance.getHost());
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(),
Constants.RPC_PORT);
StringBuilder log = new StringBuilder();
if (skipLineNum == 0) {
String head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
host,
Constants.SYSTEM_LINE_SEPARATOR);
log.append(head);
}
log.append(logClient
.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(), skipLineNum, limit));
result.setData(log.toString());
return result;
}
/**
* get log size
*
* @param taskInstId task instance id
* @return log byte array
*/
public byte[] getLogBytes(int taskInstId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
String host = getHost(taskInstance.getHost());
byte[] head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
host,
Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8);
return ArrayUtils.addAll(head,
logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath()));
}
/**
* get host
*
* @param address address
* @return old version return true ,otherwise return false
*/
private String getHost(String address) {
if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
return address;
}
return Host.of(address).getIp();
}
}

44
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java

@ -17,10 +17,14 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
@ -32,25 +36,30 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
@PrepareForTest({LoggerService.class}) @PrepareForTest({LoggerServiceImpl.class})
public class LoggerServiceTest { public class LoggerServiceTest {
private static final Logger logger = LoggerFactory.getLogger(LoggerServiceTest.class); private static final Logger logger = LoggerFactory.getLogger(LoggerServiceTest.class);
@InjectMocks @InjectMocks
private LoggerService loggerService; private LoggerServiceImpl loggerService;
@Mock @Mock
private ProcessService processService; private ProcessService processService;
@Before
public void init() {
this.loggerService.init();
}
@Test @Test
public void testQueryDataSourceList(){ public void testQueryDataSourceList() {
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Result result = loggerService.queryLog(2,1,1); Result result = loggerService.queryLog(2, 1, 1);
//TASK_INSTANCE_NOT_FOUND //TASK_INSTANCE_NOT_FOUND
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(),result.getCode().intValue()); Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue());
try { try {
//HOST NOT FOUND OR ILLEGAL //HOST NOT FOUND OR ILLEGAL
@ -59,36 +68,36 @@ public class LoggerServiceTest {
Assert.assertTrue(true); Assert.assertTrue(true);
logger.error("testQueryDataSourceList error {}", e.getMessage()); logger.error("testQueryDataSourceList error {}", e.getMessage());
} }
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(),result.getCode().intValue()); Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue());
//SUCCESS //SUCCESS
taskInstance.setHost("127.0.0.1:8080"); taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log"); taskInstance.setLogPath("/temp/log");
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
result = loggerService.queryLog(1,1,1); result = loggerService.queryLog(1, 1, 1);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
} }
@Test @Test
public void testGetLogBytes(){ public void testGetLogBytes() {
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
//task instance is null //task instance is null
try{ try {
loggerService.getLogBytes(2); loggerService.getLogBytes(2);
}catch (RuntimeException e){ } catch (RuntimeException e) {
Assert.assertTrue(true); Assert.assertTrue(true);
logger.error("testGetLogBytes error: {}","task instance is null"); logger.error("testGetLogBytes error: {}", "task instance is null");
} }
//task instance host is null //task instance host is null
try{ try {
loggerService.getLogBytes(1); loggerService.getLogBytes(1);
}catch (RuntimeException e){ } catch (RuntimeException e) {
Assert.assertTrue(true); Assert.assertTrue(true);
logger.error("testGetLogBytes error: {}","task instance host is null"); logger.error("testGetLogBytes error: {}", "task instance host is null");
} }
//success //success
@ -100,4 +109,9 @@ public class LoggerServiceTest {
} }
@After
public void close() {
this.loggerService.close();
}
} }

60
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -16,16 +16,43 @@
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.IOException;
import java.text.MessageFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -33,22 +60,13 @@ import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import java.text.MessageFormat;
import java.text.ParseException;
import java.util.*;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.Silent.class) @RunWith(MockitoJUnitRunner.Silent.class)
@SpringBootTest(classes = ApiApplicationServer.class) @SpringBootTest(classes = ApiApplicationServer.class)
public class ProcessInstanceServiceTest { public class ProcessInstanceServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceServiceTest.class);
@InjectMocks @InjectMocks
ProcessInstanceService processInstanceService; ProcessInstanceService processInstanceService;
@ -78,9 +96,7 @@ public class ProcessInstanceServiceTest {
TaskInstanceMapper taskInstanceMapper; TaskInstanceMapper taskInstanceMapper;
@Mock @Mock
LoggerService loggerService; LoggerServiceImpl loggerService;
@Mock @Mock
UsersService usersService; UsersService usersService;
@ -153,16 +169,16 @@ public class ProcessInstanceServiceTest {
User loginUser = getAdminUser(); User loginUser = getAdminUser();
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName); putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
int size=10; int size = 10;
String startTime="2020-01-01 00:00:00"; String startTime = "2020-01-01 00:00:00";
String endTime="2020-08-02 00:00:00"; String endTime = "2020-08-02 00:00:00";
Date start = DateUtils.getScheduleDate(startTime); Date start = DateUtils.getScheduleDate(startTime);
Date end = DateUtils.getScheduleDate(endTime); Date end = DateUtils.getScheduleDate(endTime);
//project auth fail //project auth fail
when(projectMapper.queryByName(projectName)).thenReturn(null); when(projectMapper.queryByName(projectName)).thenReturn(null);
when(projectService.checkProjectAndAuth(loginUser, null, projectName)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, null, projectName)).thenReturn(result);
Map<String, Object> proejctAuthFailRes = processInstanceService.queryTopNLongestRunningProcessInstance(loginUser,projectName,size,startTime,endTime); Map<String, Object> proejctAuthFailRes = processInstanceService.queryTopNLongestRunningProcessInstance(loginUser, projectName, size, startTime, endTime);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS)); Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS));
//project auth success //project auth success
@ -176,7 +192,7 @@ public class ProcessInstanceServiceTest {
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser); when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId()); when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser); when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
Map<String, Object> successRes = processInstanceService.queryTopNLongestRunningProcessInstance(loginUser,projectName,size,startTime,endTime); Map<String, Object> successRes = processInstanceService.queryTopNLongestRunningProcessInstance(loginUser, projectName, size, startTime, endTime);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
} }

33
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

@ -16,13 +16,20 @@
*/ */
package org.apache.dolphinscheduler.service.log; package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.*; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -38,6 +45,8 @@ public class LogClientService {
private final NettyRemotingClient client; private final NettyRemotingClient client;
private volatile boolean isRunning;
/** /**
* request time out * request time out
*/ */
@ -50,6 +59,7 @@ public class LogClientService {
this.clientConfig = new NettyClientConfig(); this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(4); this.clientConfig.setWorkerThreads(4);
this.client = new NettyRemotingClient(clientConfig); this.client = new NettyRemotingClient(clientConfig);
this.isRunning = true;
} }
/** /**
@ -57,11 +67,13 @@ public class LogClientService {
*/ */
public void close() { public void close() {
this.client.close(); this.client.close();
this.isRunning = false;
logger.info("logger client closed"); logger.info("logger client closed");
} }
/** /**
* roll view log * roll view log
*
* @param host host * @param host host
* @param port port * @param port port
* @param path path * @param path path
@ -69,7 +81,7 @@ public class LogClientService {
* @param limit limit * @param limit limit
* @return log content * @return log content
*/ */
public String rollViewLog(String host, int port, String path,int skipLineNum,int limit) { public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {
logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit); logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit); RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = ""; String result = "";
@ -77,7 +89,7 @@ public class LogClientService {
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if(response != null){ if (response != null) {
RollViewLogResponseCommand rollReviewLog = JsonSerializer.deserialize( RollViewLogResponseCommand rollReviewLog = JsonSerializer.deserialize(
response.getBody(), RollViewLogResponseCommand.class); response.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg(); return rollReviewLog.getMsg();
@ -92,6 +104,7 @@ public class LogClientService {
/** /**
* view log * view log
*
* @param host host * @param host host
* @param port port * @param port port
* @param path path * @param path path
@ -105,7 +118,7 @@ public class LogClientService {
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if(response != null){ if (response != null) {
ViewLogResponseCommand viewLog = JsonSerializer.deserialize( ViewLogResponseCommand viewLog = JsonSerializer.deserialize(
response.getBody(), ViewLogResponseCommand.class); response.getBody(), ViewLogResponseCommand.class);
return viewLog.getMsg(); return viewLog.getMsg();
@ -120,6 +133,7 @@ public class LogClientService {
/** /**
* get log size * get log size
*
* @param host host * @param host host
* @param port port * @param port port
* @param path log path * @param path log path
@ -133,7 +147,7 @@ public class LogClientService {
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if(response != null){ if (response != null) {
GetLogBytesResponseCommand getLog = JsonSerializer.deserialize( GetLogBytesResponseCommand getLog = JsonSerializer.deserialize(
response.getBody(), GetLogBytesResponseCommand.class); response.getBody(), GetLogBytesResponseCommand.class);
return getLog.getData(); return getLog.getData();
@ -149,6 +163,7 @@ public class LogClientService {
/** /**
* remove task log * remove task log
*
* @param host host * @param host host
* @param port port * @param port port
* @param path path * @param path path
@ -162,7 +177,7 @@ public class LogClientService {
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if(response != null){ if (response != null) {
RemoveTaskLogResponseCommand taskLogResponse = JsonSerializer.deserialize( RemoveTaskLogResponseCommand taskLogResponse = JsonSerializer.deserialize(
response.getBody(), RemoveTaskLogResponseCommand.class); response.getBody(), RemoveTaskLogResponseCommand.class);
return taskLogResponse.getStatus(); return taskLogResponse.getStatus();
@ -174,4 +189,8 @@ public class LogClientService {
} }
return result; return result;
} }
public boolean isRunning() {
return isRunning;
}
} }
Loading…
Cancel
Save