@ -16,7 +16,6 @@
* /
* /
package org.apache.dolphinscheduler.api.service.impl ;
package org.apache.dolphinscheduler.api.service.impl ;
import org.apache.dolphinscheduler.api.dto.CommandStateCount ;
import org.apache.dolphinscheduler.api.dto.CommandStateCount ;
import org.apache.dolphinscheduler.api.dto.DefineUserDto ;
import org.apache.dolphinscheduler.api.dto.DefineUserDto ;
import org.apache.dolphinscheduler.api.dto.TaskCountDto ;
import org.apache.dolphinscheduler.api.dto.TaskCountDto ;
@ -45,14 +44,14 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat ;
import java.text.MessageFormat ;
import java.util.ArrayList ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Date ;
import java.util.Date ;
import java.util.EnumMap ;
import java.util.HashMap ;
import java.util.HashMap ;
import java.util.List ;
import java.util.List ;
import java.util.Map ;
import java.util.Map ;
import java.util.Objects ;
import java.util.stream.Collectors ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import org.springframework.stereotype.Service ;
@ -62,8 +61,6 @@ import org.springframework.stereotype.Service;
@Service
@Service
public class DataAnalysisServiceImpl extends BaseService implements DataAnalysisService {
public class DataAnalysisServiceImpl extends BaseService implements DataAnalysisService {
private static final Logger logger = LoggerFactory . getLogger ( DataAnalysisServiceImpl . class ) ;
@Autowired
@Autowired
private ProjectMapper projectMapper ;
private ProjectMapper projectMapper ;
@ -88,10 +85,6 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
@Autowired
@Autowired
private ProcessService processService ;
private ProcessService processService ;
private static final String COMMAND_STATE = "commandState" ;
private static final String ERROR_COMMAND_STATE = "errorCommandState" ;
/ * *
/ * *
* statistical task instance status data
* statistical task instance status data
*
*
@ -137,16 +130,17 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
return result ;
return result ;
}
}
Date start ;
Date start = null ;
Date end ;
Date end = null ;
try {
if ( S tringUtils . isNotEmpt y( startDate ) & & StringUtils . isNotEmpty ( endDate ) ) {
start = DateUtils . getScheduleDate ( startDate ) ;
start = DateUtils . getScheduleDate ( startDate ) ;
end = DateUtils . getScheduleDate ( endDate ) ;
end = DateUtils . getScheduleDate ( endDate ) ;
} catch ( Exception e ) {
if ( Objects . isNull ( start ) | | Objects . isNull ( end ) ) {
logger . error ( e . getMessage ( ) , e ) ;
putErrorRequestParamsMsg ( result ) ;
putErrorRequestParamsMsg ( result ) ;
return result ;
return result ;
}
}
}
Integer [ ] projectIdArray = getProjectIdsArrays ( loginUser , projectId ) ;
Integer [ ] projectIdArray = getProjectIdsArrays ( loginUser , projectId ) ;
List < ExecuteStatusCount > processInstanceStateCounts =
List < ExecuteStatusCount > processInstanceStateCounts =
instanceStateCounter . apply ( start , end , projectIdArray ) ;
instanceStateCounter . apply ( start , end , projectIdArray ) ;
@ -204,79 +198,39 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
* statistics based on task status execution , failure , completion , wait , total
* statistics based on task status execution , failure , completion , wait , total
* /
* /
Date start = null ;
Date start = null ;
if ( StringUtils . isNotEmpty ( startDate ) ) {
start = DateUtils . getScheduleDate ( startDate ) ;
if ( Objects . isNull ( start ) ) {
putErrorRequestParamsMsg ( result ) ;
return result ;
}
}
Date end = null ;
Date end = null ;
if ( StringUtils . isNotEmpty ( endDate ) ) {
if ( startDate ! = null & & endDate ! = null ) {
end = DateUtils . getScheduleDate ( endDate ) ;
try {
if ( Objects . isNull ( end ) ) {
start = DateUtils . getScheduleDate ( startDate ) ;
end = DateUtils . getScheduleDate ( endDate ) ;
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
putErrorRequestParamsMsg ( result ) ;
putErrorRequestParamsMsg ( result ) ;
return result ;
return result ;
}
}
}
}
Integer [ ] projectIdArray = getProjectIdsArrays ( loginUser , projectId ) ;
Integer [ ] projectIdArray = getProjectIdsArrays ( loginUser , projectId ) ;
// count command state
// count normal command state
List < CommandCount > commandStateCounts =
Map < CommandType , Integer > normalCountCommandCounts = commandMapper . countCommandState ( loginUser . getId ( ) , start , end , projectIdArray )
commandMapper . countCommandState (
. stream ( )
loginUser . getId ( ) ,
. collect ( Collectors . toMap ( CommandCount : : getCommandType , CommandCount : : getCount ) ) ;
start ,
end ,
projectIdArray ) ;
// count error command state
// count error command state
List < CommandCount > errorCommandStateCounts =
Map < CommandType , Integer > errorCommandCounts = errorCommandMapper . countCommandState ( start , end , projectIdArray )
errorCommandMapper . countCommandState (
. stream ( )
start , end , projectIdArray ) ;
. collect ( Collectors . toMap ( CommandCount : : getCommandType , CommandCount : : getCount ) ) ;
// enumMap
Map < CommandType , Map < String , Integer > > dataMap = new EnumMap < > ( CommandType . class ) ;
Map < String , Integer > commonCommand = new HashMap < > ( ) ;
commonCommand . put ( COMMAND_STATE , 0 ) ;
commonCommand . put ( ERROR_COMMAND_STATE , 0 ) ;
// init data map
/ * *
* START_PROCESS , START_CURRENT_TASK_PROCESS , RECOVER_TOLERANCE_FAULT_PROCESS , RECOVER_SUSPENDED_PROCESS ,
START_FAILURE_TASK_PROCESS , COMPLEMENT_DATA , SCHEDULER , REPEAT_RUNNING , PAUSE , STOP , RECOVER_WAITTING_THREAD ;
* /
dataMap . put ( CommandType . START_PROCESS , commonCommand ) ;
dataMap . put ( CommandType . START_CURRENT_TASK_PROCESS , commonCommand ) ;
dataMap . put ( CommandType . RECOVER_TOLERANCE_FAULT_PROCESS , commonCommand ) ;
dataMap . put ( CommandType . RECOVER_SUSPENDED_PROCESS , commonCommand ) ;
dataMap . put ( CommandType . START_FAILURE_TASK_PROCESS , commonCommand ) ;
dataMap . put ( CommandType . COMPLEMENT_DATA , commonCommand ) ;
dataMap . put ( CommandType . SCHEDULER , commonCommand ) ;
dataMap . put ( CommandType . REPEAT_RUNNING , commonCommand ) ;
dataMap . put ( CommandType . PAUSE , commonCommand ) ;
dataMap . put ( CommandType . STOP , commonCommand ) ;
dataMap . put ( CommandType . RECOVER_WAITTING_THREAD , commonCommand ) ;
// put command state
for ( CommandCount executeStatusCount : commandStateCounts ) {
Map < String , Integer > commandStateCountsMap = new HashMap < > ( dataMap . get ( executeStatusCount . getCommandType ( ) ) ) ;
commandStateCountsMap . put ( COMMAND_STATE , executeStatusCount . getCount ( ) ) ;
dataMap . put ( executeStatusCount . getCommandType ( ) , commandStateCountsMap ) ;
}
// put error command state
List < CommandStateCount > list = Arrays . stream ( CommandType . values ( ) )
for ( CommandCount errorExecutionStatus : errorCommandStateCounts ) {
. map ( commandType - > new CommandStateCount (
Map < String , Integer > errorCommandStateCountsMap = new HashMap < > ( dataMap . get ( errorExecutionStatus . getCommandType ( ) ) ) ;
errorCommandCounts . getOrDefault ( commandType , 0 ) ,
errorCommandStateCountsMap . put ( ERROR_COMMAND_STATE , errorExecutionStatus . getCount ( ) ) ;
normalCountCommandCounts . getOrDefault ( commandType , 0 ) ,
dataMap . put ( errorExecutionStatus . getCommandType ( ) , errorCommandStateCountsMap ) ;
commandType )
}
) . collect ( Collectors . toList ( ) ) ;
List < CommandStateCount > list = new ArrayList < > ( ) ;
for ( Map . Entry < CommandType , Map < String , Integer > > next : dataMap . entrySet ( ) ) {
CommandStateCount commandStateCount = new CommandStateCount ( next . getValue ( ) . get ( ERROR_COMMAND_STATE ) ,
next . getValue ( ) . get ( COMMAND_STATE ) , next . getKey ( ) ) ;
list . add ( commandStateCount ) ;
}
result . put ( Constants . DATA_LIST , list ) ;
result . put ( Constants . DATA_LIST , list ) ;
putMsg ( result , Status . SUCCESS ) ;
putMsg ( result , Status . SUCCESS ) ;
@ -311,59 +265,10 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
return result ;
return result ;
}
}
// TODO tasksQueueList and tasksKillList is never updated.
//TODO need to add detail data info
List < String > tasksQueueList = new ArrayList < > ( ) ;
List < String > tasksKillList = new ArrayList < > ( ) ;
Map < String , Integer > dataMap = new HashMap < > ( ) ;
Map < String , Integer > dataMap = new HashMap < > ( ) ;
if ( loginUser . getUserType ( ) = = UserType . ADMIN_USER ) {
dataMap . put ( "taskQueue" , 0 ) ;
dataMap . put ( "taskQueue" , tasksQueueList . size ( ) ) ;
dataMap . put ( "taskKill" , 0 ) ;
dataMap . put ( "taskKill" , tasksKillList . size ( ) ) ;
result . put ( Constants . DATA_LIST , dataMap ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
int [ ] tasksQueueIds = new int [ tasksQueueList . size ( ) ] ;
int [ ] tasksKillIds = new int [ tasksKillList . size ( ) ] ;
int i = 0 ;
for ( String taskQueueStr : tasksQueueList ) {
if ( StringUtils . isNotEmpty ( taskQueueStr ) ) {
String [ ] splits = taskQueueStr . split ( "_" ) ;
if ( splits . length > = 4 ) {
tasksQueueIds [ i + + ] = Integer . parseInt ( splits [ 3 ] ) ;
}
}
}
i = 0 ;
for ( String taskKillStr : tasksKillList ) {
if ( StringUtils . isNotEmpty ( taskKillStr ) ) {
String [ ] splits = taskKillStr . split ( "-" ) ;
if ( splits . length = = 2 ) {
tasksKillIds [ i + + ] = Integer . parseInt ( splits [ 1 ] ) ;
}
}
}
Integer taskQueueCount = 0 ;
Integer taskKillCount = 0 ;
Integer [ ] projectIds = getProjectIdsArrays ( loginUser , projectId ) ;
if ( tasksQueueIds . length ! = 0 ) {
taskQueueCount = taskInstanceMapper . countTask (
projectIds ,
tasksQueueIds ) ;
}
if ( tasksKillIds . length ! = 0 ) {
taskKillCount = taskInstanceMapper . countTask ( projectIds , tasksKillIds ) ;
}
dataMap . put ( "taskQueue" , taskQueueCount ) ;
dataMap . put ( "taskKill" , taskKillCount ) ;
result . put ( Constants . DATA_LIST , dataMap ) ;
result . put ( Constants . DATA_LIST , dataMap ) ;
putMsg ( result , Status . SUCCESS ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
return result ;