@ -20,10 +20,16 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.exceptions.ServiceException ;
import org.apache.dolphinscheduler.api.exceptions.ServiceException ;
import org.apache.dolphinscheduler.api.service.LoggerService ;
import org.apache.dolphinscheduler.api.service.LoggerService ;
import org.apache.dolphinscheduler.api.service.ProjectService ;
import org.apache.dolphinscheduler.api.utils.Result ;
import org.apache.dolphinscheduler.api.utils.Result ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.utils.PropertyUtils ;
import org.apache.dolphinscheduler.common.utils.PropertyUtils ;
import org.apache.dolphinscheduler.dao.entity.Project ;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
import org.apache.dolphinscheduler.dao.entity.User ;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper ;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper ;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.service.log.LogClientService ;
import org.apache.dolphinscheduler.service.log.LogClientService ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
@ -31,6 +37,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils ;
import org.apache.commons.lang.StringUtils ;
import java.nio.charset.StandardCharsets ;
import java.nio.charset.StandardCharsets ;
import java.util.Map ;
import java.util.Objects ;
import java.util.Objects ;
import javax.annotation.PostConstruct ;
import javax.annotation.PostConstruct ;
@ -47,7 +54,7 @@ import com.google.common.primitives.Bytes;
* logger service impl
* logger service impl
* /
* /
@Service
@Service
public class LoggerServiceImpl implements LoggerService {
public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService {
private static final Logger logger = LoggerFactory . getLogger ( LoggerServiceImpl . class ) ;
private static final Logger logger = LoggerFactory . getLogger ( LoggerServiceImpl . class ) ;
@ -58,6 +65,15 @@ public class LoggerServiceImpl implements LoggerService {
private LogClientService logClient ;
private LogClientService logClient ;
@Autowired
ProjectMapper projectMapper ;
@Autowired
ProjectService projectService ;
@Autowired
TaskDefinitionMapper taskDefinitionMapper ;
@PostConstruct
@PostConstruct
public void init ( ) {
public void init ( ) {
if ( Objects . isNull ( this . logClient ) ) {
if ( Objects . isNull ( this . logClient ) ) {
@ -89,10 +105,117 @@ public class LoggerServiceImpl implements LoggerService {
if ( taskInstance = = null | | StringUtils . isBlank ( taskInstance . getHost ( ) ) ) {
if ( taskInstance = = null | | StringUtils . isBlank ( taskInstance . getHost ( ) ) ) {
return Result . error ( Status . TASK_INSTANCE_NOT_FOUND ) ;
return Result . error ( Status . TASK_INSTANCE_NOT_FOUND ) ;
}
}
Result < String > result = new Result < > ( Status . SUCCESS . getCode ( ) , Status . SUCCESS . getMsg ( ) ) ;
String log = queryLog ( taskInstance , skipLineNum , limit ) ;
result . setData ( log ) ;
return result ;
}
String host = getHost ( taskInstance . getHost ( ) ) ;
Result < String > result = new Result < > ( Status . SUCCESS . getCode ( ) , Status . SUCCESS . getMsg ( ) ) ;
/ * *
* get log size
*
* @param taskInstId task instance id
* @return log byte array
* /
@Override
public byte [ ] getLogBytes ( int taskInstId ) {
TaskInstance taskInstance = processService . findTaskInstanceById ( taskInstId ) ;
if ( taskInstance = = null | | StringUtils . isBlank ( taskInstance . getHost ( ) ) ) {
throw new ServiceException ( "task instance is null or host is null" ) ;
}
return getLogBytes ( taskInstance ) ;
}
/ * *
* query log
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
* /
@Override
@SuppressWarnings ( "unchecked" )
public Map < String , Object > queryLog ( User loginUser , long projectCode , int taskInstId , int skipLineNum , int limit ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
// check whether the task instance can be found
TaskInstance task = processService . findTaskInstanceById ( taskInstId ) ;
if ( task = = null | | StringUtils . isBlank ( task . getHost ( ) ) ) {
putMsg ( result , Status . TASK_INSTANCE_NOT_FOUND ) ;
return result ;
}
TaskDefinition taskDefinition = taskDefinitionMapper . queryByCode ( task . getTaskCode ( ) ) ;
if ( taskDefinition ! = null & & projectCode ! = taskDefinition . getProjectCode ( ) ) {
putMsg ( result , Status . TASK_INSTANCE_NOT_FOUND , taskInstId ) ;
return result ;
}
String log = queryLog ( task , skipLineNum , limit ) ;
result . put ( Constants . DATA_LIST , log ) ;
return result ;
}
/ * *
* get log bytes
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @return log byte array
* /
@Override
public byte [ ] getLogBytes ( User loginUser , long projectCode , int taskInstId ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
throw new ServiceException ( "user has no permission" ) ;
}
// check whether the task instance can be found
TaskInstance task = processService . findTaskInstanceById ( taskInstId ) ;
if ( task = = null | | StringUtils . isBlank ( task . getHost ( ) ) ) {
throw new ServiceException ( "task instance is null or host is null" ) ;
}
TaskDefinition taskDefinition = taskDefinitionMapper . queryByCode ( task . getTaskCode ( ) ) ;
if ( taskDefinition ! = null & & projectCode ! = taskDefinition . getProjectCode ( ) ) {
throw new ServiceException ( "task instance does not exist in project" ) ;
}
return getLogBytes ( task ) ;
}
/ * *
* get host
*
* @param address address
* @return old version return true , otherwise return false
* /
private String getHost ( String address ) {
if ( Boolean . TRUE . equals ( Host . isOldVersion ( address ) ) ) {
return address ;
}
return Host . of ( address ) . getIp ( ) ;
}
/ * *
* query log
*
* @param taskInstance task instance
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
* /
private String queryLog ( TaskInstance taskInstance , int skipLineNum , int limit ) {
String host = getHost ( taskInstance . getHost ( ) ) ;
logger . info ( "log host : {} , logPath : {} , logServer port : {}" , host , taskInstance . getLogPath ( ) ,
logger . info ( "log host : {} , logPath : {} , logServer port : {}" , host , taskInstance . getLogPath ( ) ,
PropertyUtils . getInt ( Constants . RPC_PORT , 50051 ) ) ;
PropertyUtils . getInt ( Constants . RPC_PORT , 50051 ) ) ;
@ -109,23 +232,16 @@ public class LoggerServiceImpl implements LoggerService {
log . append ( logClient
log . append ( logClient
. rollViewLog ( host , PropertyUtils . getInt ( Constants . RPC_PORT , 50051 ) , taskInstance . getLogPath ( ) , skipLineNum , limit ) ) ;
. rollViewLog ( host , PropertyUtils . getInt ( Constants . RPC_PORT , 50051 ) , taskInstance . getLogPath ( ) , skipLineNum , limit ) ) ;
result . setData ( log . toString ( ) ) ;
return log . toString ( ) ;
return result ;
}
}
/ * *
/ * *
* get log size
* get log byte s
*
*
* @param taskInstId task instance id
* @param taskInstance task instance
* @return log byte array
* @return log byte array
* /
* /
@Override
private byte [ ] getLogBytes ( TaskInstance taskInstance ) {
public byte [ ] getLogBytes ( int taskInstId ) {
TaskInstance taskInstance = processService . findTaskInstanceById ( taskInstId ) ;
if ( taskInstance = = null | | StringUtils . isBlank ( taskInstance . getHost ( ) ) ) {
throw new ServiceException ( "task instance is null or host is null" ) ;
}
String host = getHost ( taskInstance . getHost ( ) ) ;
String host = getHost ( taskInstance . getHost ( ) ) ;
byte [ ] head = String . format ( LOG_HEAD_FORMAT ,
byte [ ] head = String . format ( LOG_HEAD_FORMAT ,
taskInstance . getLogPath ( ) ,
taskInstance . getLogPath ( ) ,
@ -134,17 +250,4 @@ public class LoggerServiceImpl implements LoggerService {
return Bytes . concat ( head ,
return Bytes . concat ( head ,
logClient . getLogBytes ( host , PropertyUtils . getInt ( Constants . RPC_PORT , 50051 ) , taskInstance . getLogPath ( ) ) ) ;
logClient . getLogBytes ( host , PropertyUtils . getInt ( Constants . RPC_PORT , 50051 ) , taskInstance . getLogPath ( ) ) ) ;
}
}
/ * *
* get host
*
* @param address address
* @return old version return true , otherwise return false
* /
private String getHost ( String address ) {
if ( Boolean . TRUE . equals ( Host . isOldVersion ( address ) ) ) {
return address ;
}
return Host . of ( address ) . getIp ( ) ;
}
}
}