@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager ;
import org.apache.dolphinscheduler.service.task.TaskPluginManager ;
import org.apache.commons.collections.CollectionUtils ;
import org.apache.commons.collections.CollectionUtils ;
import org.apache.commons.collections.MapUtils ;
import org.apache.commons.lang3.StringUtils ;
import org.apache.commons.lang3.StringUtils ;
import java.util.ArrayList ;
import java.util.ArrayList ;
@ -135,8 +136,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
}
List < TaskDefinitionLog > taskDefinitionLogs = JSONUtils . toList ( taskDefinitionJson , TaskDefinitionLog . class ) ;
List < TaskDefinitionLog > taskDefinitionLogs = JSONUtils . toList ( taskDefinitionJson , TaskDefinitionLog . class ) ;
if ( taskDefinitionLog s. isEmpty ( ) ) {
if ( CollectionUtil s. isEmpty ( taskDefinitionLogs ) ) {
logger . error ( "taskDefinitionJson invalid: {}" , taskDefinitionJson ) ;
logger . warn ( "Parameter taskDefinitionJson is invalid." ) ;
putMsg ( result , Status . DATA_IS_NOT_VALID , taskDefinitionJson ) ;
putMsg ( result , Status . DATA_IS_NOT_VALID , taskDefinitionJson ) ;
return result ;
return result ;
}
}
@ -232,7 +233,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
// upstreamTaskCodes - queryUpStreamTaskCodes
// upstreamTaskCodes - queryUpStreamTaskCodes
Set < Long > diffCode = upstreamTaskCodes . stream ( ) . filter ( code - > ! queryUpStreamTaskCodes . contains ( code ) )
Set < Long > diffCode = upstreamTaskCodes . stream ( ) . filter ( code - > ! queryUpStreamTaskCodes . contains ( code ) )
. collect ( Collectors . toSet ( ) ) ;
. collect ( Collectors . toSet ( ) ) ;
if ( ! diffCode . isEmpty ( ) ) {
if ( CollectionUtils . isNotEmpty ( diffCode ) ) {
String taskCodes = StringUtils . join ( diffCode , Constants . COMMA ) ;
String taskCodes = StringUtils . join ( diffCode , Constants . COMMA ) ;
logger . error ( "Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}." ,
logger . error ( "Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}." ,
taskCodes ) ;
taskCodes ) ;
@ -351,8 +352,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result ;
return result ;
}
}
List < ProcessTaskRelation > processTaskRelationList =
List < ProcessTaskRelation > processTaskRelationList =
processTaskRelationMapper . queryDownstreamByTaskCode ( taskCode ) ;
processTaskRelationMapper . queryDownstreamByTaskCode ( taskDefinition . get Code ( ) ) ;
if ( ! processTaskRelationList . isEmpty ( ) ) {
if ( CollectionUtils . isNotEmpty ( processTaskRelationList ) ) {
Set < Long > postTaskCodes = processTaskRelationList
Set < Long > postTaskCodes = processTaskRelationList
. stream ( )
. stream ( )
. map ( ProcessTaskRelation : : getPostTaskCode )
. map ( ProcessTaskRelation : : getPostTaskCode )
@ -445,7 +446,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
}
List < ProcessTaskRelation > taskRelationList =
List < ProcessTaskRelation > taskRelationList =
processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
if ( ! taskRelationList . isEmpty ( ) ) {
if ( CollectionUtils . isNotEmpty ( taskRelationList ) ) {
logger . info (
logger . info (
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}." ,
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}." ,
taskCode ) ;
taskCode ) ;
@ -573,13 +574,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
}
}
}
Map < Long , TaskDefinition > queryUpStreamTaskCodeMap ;
Map < Long , TaskDefinition > queryUpStreamTaskCodeMap ;
if ( ! upstreamTaskCodes . isEmpty ( ) ) {
if ( CollectionUtils . isNotEmpty ( upstreamTaskCodes ) ) {
List < TaskDefinition > upstreamTaskDefinitionList = taskDefinitionMapper . queryByCodeList ( upstreamTaskCodes ) ;
List < TaskDefinition > upstreamTaskDefinitionList = taskDefinitionMapper . queryByCodeList ( upstreamTaskCodes ) ;
queryUpStreamTaskCodeMap = upstreamTaskDefinitionList . stream ( )
queryUpStreamTaskCodeMap = upstreamTaskDefinitionList . stream ( )
. collect ( Collectors . toMap ( TaskDefinition : : getCode , taskDefinition - > taskDefinition ) ) ;
. collect ( Collectors . toMap ( TaskDefinition : : getCode , taskDefinition - > taskDefinition ) ) ;
// upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
// upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
upstreamTaskCodes . removeAll ( queryUpStreamTaskCodeMap . keySet ( ) ) ;
upstreamTaskCodes . removeAll ( queryUpStreamTaskCodeMap . keySet ( ) ) ;
if ( ! upstreamTaskCodes . isEmpty ( ) ) {
if ( CollectionUtils . isNotEmpty ( upstreamTaskCodes ) ) {
String notExistTaskCodes = StringUtils . join ( upstreamTaskCodes , Constants . COMMA ) ;
String notExistTaskCodes = StringUtils . join ( upstreamTaskCodes , Constants . COMMA ) ;
logger . error ( "Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}." ,
logger . error ( "Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}." ,
notExistTaskCodes ) ;
notExistTaskCodes ) ;
@ -589,7 +590,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
} else {
} else {
queryUpStreamTaskCodeMap = new HashMap < > ( ) ;
queryUpStreamTaskCodeMap = new HashMap < > ( ) ;
}
}
if ( ! upstreamTaskRelations . isEmpty ( ) ) {
if ( CollectionUtils . isNotEmpty ( upstreamTaskCodes ) ) {
ProcessTaskRelation taskRelation = upstreamTaskRelations . get ( 0 ) ;
ProcessTaskRelation taskRelation = upstreamTaskRelations . get ( 0 ) ;
List < ProcessTaskRelation > processTaskRelations =
List < ProcessTaskRelation > processTaskRelations =
processTaskRelationMapper . queryByProcessCode ( projectCode , taskRelation . getProcessDefinitionCode ( ) ) ;
processTaskRelationMapper . queryByProcessCode ( projectCode , taskRelation . getProcessDefinitionCode ( ) ) ;
@ -613,7 +614,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskRelation . setPreTaskVersion ( queryUpStreamTask . getValue ( ) . getVersion ( ) ) ;
taskRelation . setPreTaskVersion ( queryUpStreamTask . getValue ( ) . getVersion ( ) ) ;
processTaskRelationList . add ( taskRelation ) ;
processTaskRelationList . add ( taskRelation ) ;
}
}
if ( queryUpStreamTaskCode Map. isEmpty ( ) & & ! processTaskRelationList . isEmpty ( ) ) {
if ( MapUtils . isEmpty ( queryUpStreamTaskCodeMap ) & & CollectionUtils . isNot Empty ( processTaskRelationList ) ) {
processTaskRelationList . add ( processTaskRelationList . get ( 0 ) ) ;
processTaskRelationList . add ( processTaskRelationList . get ( 0 ) ) ;
}
}
updateDag ( loginUser , result , taskRelation . getProcessDefinitionCode ( ) , processTaskRelations ,
updateDag ( loginUser , result , taskRelation . getProcessDefinitionCode ( ) , processTaskRelations ,
@ -666,7 +667,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if ( switchVersion > 0 ) {
if ( switchVersion > 0 ) {
List < ProcessTaskRelation > taskRelationList =
List < ProcessTaskRelation > taskRelationList =
processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
if ( ! taskRelationList . isEmpty ( ) ) {
if ( CollectionUtils . isNotEmpty ( taskRelationList ) ) {
logger . info (
logger . info (
"Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}." ,
"Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}." ,
taskCode ) ;
taskCode ) ;
@ -795,7 +796,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskDefinitionMapper . queryDefineListPaging ( page , projectCode , searchWorkflowName ,
taskDefinitionMapper . queryDefineListPaging ( page , projectCode , searchWorkflowName ,
searchTaskName , taskType , taskExecuteType ) ;
searchTaskName , taskType , taskExecuteType ) ;
List < TaskMainInfo > records = taskMainInfoIPage . getRecords ( ) ;
List < TaskMainInfo > records = taskMainInfoIPage . getRecords ( ) ;
if ( ! records . isEmpty ( ) ) {
if ( CollectionUtils . isNotEmpty ( records ) ) {
Map < Long , TaskMainInfo > taskMainInfoMap = new HashMap < > ( ) ;
Map < Long , TaskMainInfo > taskMainInfoMap = new HashMap < > ( ) ;
for ( TaskMainInfo info : records ) {
for ( TaskMainInfo info : records ) {
taskMainInfoMap . compute ( info . getTaskCode ( ) , ( k , v ) - > {
taskMainInfoMap . compute ( info . getTaskCode ( ) , ( k , v ) - > {