@ -17,8 +17,16 @@
package org.apache.dolphinscheduler.api.service.impl ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_VERSION_VIEW ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.exceptions.ServiceException ;
import org.apache.dolphinscheduler.api.permission.PermissionCheck ;
import org.apache.dolphinscheduler.api.service.ProjectService ;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService ;
import org.apache.dolphinscheduler.api.utils.PageInfo ;
@ -46,7 +54,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper ;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper ;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode ;
import org.apache.dolphinscheduler.api.permission.PermissionCheck ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
import org.apache.dolphinscheduler.service.task.TaskPluginManager ;
@ -73,8 +80,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
import com.google.common.collect.Lists ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.* ;
/ * *
* task definition service impl
* /
@ -122,8 +127,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
long projectCode ,
String taskDefinitionJson ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION_CREATE ) ;
// check user access for project
Map < String , Object > result =
projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION_CREATE ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
@ -153,7 +159,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
Map < String , Object > resData = new HashMap < > ( ) ;
resData . put ( "total" , taskDefinitionLogs . size ( ) ) ;
resData . put ( "code" , StringUtils . join ( taskDefinitionLogs . stream ( ) . map ( TaskDefinition : : getCode ) . collect ( Collectors . toList ( ) ) , "," ) ) ;
resData . put ( "code" , StringUtils
. join ( taskDefinitionLogs . stream ( ) . map ( TaskDefinition : : getCode ) . collect ( Collectors . toList ( ) ) , "," ) ) ;
putMsg ( result , Status . SUCCESS ) ;
result . put ( Constants . DATA_LIST , resData ) ;
return result ;
@ -177,8 +184,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
String taskDefinitionJsonObj ,
String upstreamCodes ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION_CREATE ) ;
// check user access for project
Map < String , Object > result =
projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION_CREATE ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
@ -211,25 +219,27 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
long taskCode = taskDefinition . getCode ( ) ;
if ( taskCode = = 0 ) {
try {
taskCode = CodeGenerateUtils . getInstance ( ) . genCode ( ) ;
taskDefinition . setCode ( taskCode ) ;
} catch ( CodeGenerateException e ) {
logger . error ( "Generate task definition code error." , e ) ;
putMsg ( result , Status . INTERNAL_SERVER_ERROR_ARGS , taskDefinitionJsonObj ) ;
return result ;
}
taskDefinition . setCode ( CodeGenerateUtils . getInstance ( ) . genCode ( ) ) ;
}
List < ProcessTaskRelationLog > processTaskRelationLogList = Lists . newArrayList ( ) ;
List < ProcessTaskRelationLog > processTaskRelationLogList =
processTaskRelationMapper . queryByProcessCode ( projectCode , processDefinitionCode )
. stream ( )
. map ( ProcessTaskRelationLog : : new )
. collect ( Collectors . toList ( ) ) ;
if ( StringUtils . isNotBlank ( upstreamCodes ) ) {
Set < Long > upstreamTaskCodes = Arrays . stream ( upstreamCodes . split ( Constants . COMMA ) ) . map ( Long : : parseLong ) . collect ( Collectors . toSet ( ) ) ;
Set < Long > upstreamTaskCodes = Arrays . stream ( upstreamCodes . split ( Constants . COMMA ) ) . map ( Long : : parseLong )
. collect ( Collectors . toSet ( ) ) ;
List < TaskDefinition > upstreamTaskDefinitionList = taskDefinitionMapper . queryByCodeList ( upstreamTaskCodes ) ;
Set < Long > queryUpStreamTaskCodes = upstreamTaskDefinitionList . stream ( ) . map ( TaskDefinition : : getCode ) . collect ( Collectors . toSet ( ) ) ;
Set < Long > queryUpStreamTaskCodes =
upstreamTaskDefinitionList . stream ( ) . map ( TaskDefinition : : getCode ) . collect ( Collectors . toSet ( ) ) ;
// upstreamTaskCodes - queryUpStreamTaskCodes
Set < Long > diffCode = upstreamTaskCodes . stream ( ) . filter ( code - > ! queryUpStreamTaskCodes . contains ( code ) ) . collect ( Collectors . toSet ( ) ) ;
Set < Long > diffCode = upstreamTaskCodes . stream ( ) . filter ( code - > ! queryUpStreamTaskCodes . contains ( code ) )
. collect ( Collectors . toSet ( ) ) ;
if ( ! diffCode . isEmpty ( ) ) {
String taskCodes = StringUtils . join ( diffCode , Constants . COMMA ) ;
logger . error ( "Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}." , taskCodes ) ;
logger . error ( "Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}." ,
taskCodes ) ;
putMsg ( result , Status . TASK_DEFINE_NOT_EXIST , taskCodes ) ;
return result ;
}
@ -243,10 +253,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
processTaskRelationLog . setConditionParams ( "{}" ) ;
processTaskRelationLogList . add ( processTaskRelationLog ) ;
}
List < ProcessTaskRelation > processTaskRelationList = processTaskRelationMapper . queryByProcessCode ( projectCode , processDefinitionCode ) ;
if ( ! processTaskRelationList . isEmpty ( ) ) {
processTaskRelationLogList . addAll ( processTaskRelationList . stream ( ) . map ( ProcessTaskRelationLog : : new ) . collect ( Collectors . toList ( ) ) ) ;
}
} else {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog ( ) ;
processTaskRelationLog . setPreTaskCode ( 0 ) ;
@ -257,22 +263,30 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
processTaskRelationLog . setConditionParams ( "{}" ) ;
processTaskRelationLogList . add ( processTaskRelationLog ) ;
}
int insertResult = processService . saveTaskRelation ( loginUser , projectCode , processDefinition . getCode ( ) , processDefinition . getVersion ( ) ,
processTaskRelationLogList , Lists . newArrayList ( ) , Boolean . TRUE ) ;
int insertResult = processService . saveTaskRelation ( loginUser , projectCode , processDefinition . getCode ( ) ,
processDefinition . getVersion ( ) ,
processTaskRelationLogList , Lists . newArrayList ( ) , Boolean . TRUE ) ;
if ( insertResult ! = Constants . EXIT_CODE_SUCCESS ) {
logger . error ( "Save new version process task relations error, processDefinitionCode:{}, processDefinitionVersion:{}." , processDefinition . getCode ( ) , processDefinition . getVersion ( ) ) ;
logger . error (
"Save new version process task relations error, processDefinitionCode:{}, processDefinitionVersion:{}." ,
processDefinition . getCode ( ) , processDefinition . getVersion ( ) ) ;
putMsg ( result , Status . CREATE_PROCESS_TASK_RELATION_ERROR ) ;
throw new ServiceException ( Status . CREATE_PROCESS_TASK_RELATION_ERROR ) ;
} else
logger . info ( "Save new version process task relations complete, processDefinitionCode:{}, processDefinitionVersion:{}." , processDefinition . getCode ( ) , processDefinition . getVersion ( ) ) ;
logger . info (
"Save new version process task relations complete, processDefinitionCode:{}, processDefinitionVersion:{}." ,
processDefinition . getCode ( ) , processDefinition . getVersion ( ) ) ;
int saveTaskResult = processService . saveTaskDefine ( loginUser , projectCode , Lists . newArrayList ( taskDefinition ) , Boolean . TRUE ) ;
int saveTaskResult =
processService . saveTaskDefine ( loginUser , projectCode , Lists . newArrayList ( taskDefinition ) , Boolean . TRUE ) ;
if ( saveTaskResult = = Constants . DEFINITION_FAILURE ) {
logger . error ( "Save task definition error, projectCode:{}, taskDefinitionCode:{}." , projectCode , taskDefinition . getCode ( ) ) ;
logger . error ( "Save task definition error, projectCode:{}, taskDefinitionCode:{}." , projectCode ,
taskDefinition . getCode ( ) ) ;
putMsg ( result , Status . CREATE_TASK_DEFINITION_ERROR ) ;
throw new ServiceException ( Status . CREATE_TASK_DEFINITION_ERROR ) ;
} else
logger . info ( "Save task definition complete, projectCode:{}, taskDefinitionCode:{}." , projectCode , taskDefinition . getCode ( ) ) ;
logger . info ( "Save task definition complete, projectCode:{}, taskDefinitionCode:{}." , projectCode ,
taskDefinition . getCode ( ) ) ;
putMsg ( result , Status . SUCCESS ) ;
result . put ( Constants . DATA_LIST , taskDefinition ) ;
return result ;
@ -287,10 +301,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
* @param taskName task name
* /
@Override
public Map < String , Object > queryTaskDefinitionByName ( User loginUser , long projectCode , long processCode , String taskName ) {
public Map < String , Object > queryTaskDefinitionByName ( User loginUser , long projectCode , long processCode ,
String taskName ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION ) ;
// check user access for project
Map < String , Object > result =
projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
@ -318,8 +334,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Override
public Map < String , Object > deleteTaskDefinitionByCode ( User loginUser , long projectCode , long taskCode ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION_DELETE ) ;
// check user access for project
Map < String , Object > result =
projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION_DELETE ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
@ -335,33 +352,42 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result ;
}
if ( processService . isTaskOnline ( taskCode ) & & taskDefinition . getFlag ( ) = = Flag . YES ) {
logger . warn ( "Task definition can not be deleted due to task state online, taskDefinitionCode:{}." , taskCode ) ;
logger . warn ( "Task definition can not be deleted due to task state online, taskDefinitionCode:{}." ,
taskCode ) ;
putMsg ( result , Status . TASK_DEFINE_STATE_ONLINE , taskCode ) ;
return result ;
}
List < ProcessTaskRelation > processTaskRelationList = processTaskRelationMapper . queryDownstreamByTaskCode ( taskCode ) ;
List < ProcessTaskRelation > processTaskRelationList =
processTaskRelationMapper . queryDownstreamByTaskCode ( taskCode ) ;
if ( ! processTaskRelationList . isEmpty ( ) ) {
Set < Long > postTaskCodes = processTaskRelationList
. stream ( )
. map ( ProcessTaskRelation : : getPostTaskCode )
. collect ( Collectors . toSet ( ) ) ;
String postTaskCodesStr = StringUtils . join ( postTaskCodes , "," ) ;
logger . warn ( "Task definition can not be deleted due to downstream tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}" ,
logger . warn (
"Task definition can not be deleted due to downstream tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}" ,
taskCode , postTaskCodesStr ) ;
putMsg ( result , Status . TASK_HAS_DOWNSTREAM , postTaskCodesStr ) ;
return result ;
}
int delete = taskDefinitionMapper . deleteByCode ( taskCode ) ;
if ( delete > 0 ) {
List < ProcessTaskRelation > taskRelationList = processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
List < ProcessTaskRelation > taskRelationList =
processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
if ( ! taskRelationList . isEmpty ( ) ) {
logger . info ( "Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}." , taskCode ) ;
logger . info (
"Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}." ,
taskCode ) ;
long processDefinitionCode = taskRelationList . get ( 0 ) . getProcessDefinitionCode ( ) ;
List < ProcessTaskRelation > processTaskRelations = processTaskRelationMapper . queryByProcessCode ( projectCode , processDefinitionCode ) ;
List < ProcessTaskRelation > relationList = processTaskRelations . stream ( ) . filter ( r - > r . getPostTaskCode ( ) ! = taskCode ) . collect ( Collectors . toList ( ) ) ;
List < ProcessTaskRelation > processTaskRelations =
processTaskRelationMapper . queryByProcessCode ( projectCode , processDefinitionCode ) ;
List < ProcessTaskRelation > relationList = processTaskRelations . stream ( )
. filter ( r - > r . getPostTaskCode ( ) ! = taskCode ) . collect ( Collectors . toList ( ) ) ;
updateDag ( loginUser , result , processDefinitionCode , relationList , Lists . newArrayList ( ) ) ;
} else {
logger . info ( "Task definition delete complete, projectCode:{}, taskDefinitionCode:{}." , projectCode , taskCode ) ;
logger . info ( "Task definition delete complete, projectCode:{}, taskDefinitionCode:{}." , projectCode ,
taskCode ) ;
putMsg ( result , Status . SUCCESS ) ;
}
} else {
@ -372,7 +398,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result ;
}
private void updateDag ( User loginUser , Map < String , Object > result , long processDefinitionCode , List < ProcessTaskRelation > processTaskRelationList ,
private void updateDag ( User loginUser , Map < String , Object > result , long processDefinitionCode ,
List < ProcessTaskRelation > processTaskRelationList ,
List < TaskDefinitionLog > taskDefinitionLogs ) {
ProcessDefinition processDefinition = processDefinitionMapper . queryByCode ( processDefinitionCode ) ;
if ( processDefinition = = null ) {
@ -381,19 +408,27 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
int insertVersion = processService . saveProcessDefine ( loginUser , processDefinition , Boolean . TRUE , Boolean . TRUE ) ;
if ( insertVersion < = 0 ) {
logger . error ( "Update process definition error, projectCode:{}, processDefinitionCode:{}." , processDefinition . getProjectCode ( ) , processDefinitionCode ) ;
logger . error ( "Update process definition error, projectCode:{}, processDefinitionCode:{}." ,
processDefinition . getProjectCode ( ) , processDefinitionCode ) ;
throw new ServiceException ( Status . UPDATE_PROCESS_DEFINITION_ERROR ) ;
} else
logger . info ( "Save new version process definition complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}." , processDefinition . getProjectCode ( ) , processDefinitionCode , insertVersion ) ;
List < ProcessTaskRelationLog > relationLogs = processTaskRelationList . stream ( ) . map ( ProcessTaskRelationLog : : new ) . collect ( Collectors . toList ( ) ) ;
int insertResult = processService . saveTaskRelation ( loginUser , processDefinition . getProjectCode ( ) , processDefinition . getCode ( ) ,
logger . info (
"Save new version process definition complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}." ,
processDefinition . getProjectCode ( ) , processDefinitionCode , insertVersion ) ;
List < ProcessTaskRelationLog > relationLogs =
processTaskRelationList . stream ( ) . map ( ProcessTaskRelationLog : : new ) . collect ( Collectors . toList ( ) ) ;
int insertResult = processService . saveTaskRelation ( loginUser , processDefinition . getProjectCode ( ) ,
processDefinition . getCode ( ) ,
insertVersion , relationLogs , taskDefinitionLogs , Boolean . TRUE ) ;
if ( insertResult = = Constants . EXIT_CODE_SUCCESS ) {
logger . info ( "Save new version task relations complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}." , processDefinition . getProjectCode ( ) , processDefinitionCode , insertVersion ) ;
logger . info (
"Save new version task relations complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}." ,
processDefinition . getProjectCode ( ) , processDefinitionCode , insertVersion ) ;
putMsg ( result , Status . SUCCESS ) ;
result . put ( Constants . DATA_LIST , processDefinition ) ;
} else {
logger . error ( "Update task relations error, projectCode:{}, processDefinitionCode:{}." , processDefinition . getProjectCode ( ) , processDefinitionCode ) ;
logger . error ( "Update task relations error, projectCode:{}, processDefinitionCode:{}." ,
processDefinition . getProjectCode ( ) , processDefinitionCode ) ;
putMsg ( result , Status . UPDATE_PROCESS_DEFINITION_ERROR ) ;
throw new ServiceException ( Status . UPDATE_PROCESS_DEFINITION_ERROR ) ;
}
@ -409,18 +444,25 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
* /
@Transactional
@Override
public Map < String , Object > updateTaskDefinition ( User loginUser , long projectCode , long taskCode , String taskDefinitionJsonObj ) {
public Map < String , Object > updateTaskDefinition ( User loginUser , long projectCode , long taskCode ,
String taskDefinitionJsonObj ) {
Map < String , Object > result = new HashMap < > ( ) ;
TaskDefinitionLog taskDefinitionToUpdate = updateTask ( loginUser , projectCode , taskCode , taskDefinitionJsonObj , result ) ;
TaskDefinitionLog taskDefinitionToUpdate =
updateTask ( loginUser , projectCode , taskCode , taskDefinitionJsonObj , result ) ;
if ( taskDefinitionToUpdate = = null ) {
return result ;
}
List < ProcessTaskRelation > taskRelationList = processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
List < ProcessTaskRelation > taskRelationList =
processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
if ( ! taskRelationList . isEmpty ( ) ) {
logger . info ( "Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}." , taskCode ) ;
logger . info (
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}." ,
taskCode ) ;
long processDefinitionCode = taskRelationList . get ( 0 ) . getProcessDefinitionCode ( ) ;
List < ProcessTaskRelation > processTaskRelations = processTaskRelationMapper . queryByProcessCode ( projectCode , processDefinitionCode ) ;
updateDag ( loginUser , result , processDefinitionCode , processTaskRelations , Lists . newArrayList ( taskDefinitionToUpdate ) ) ;
List < ProcessTaskRelation > processTaskRelations =
processTaskRelationMapper . queryByProcessCode ( projectCode , processDefinitionCode ) ;
updateDag ( loginUser , result , processDefinitionCode , processTaskRelations ,
Lists . newArrayList ( taskDefinitionToUpdate ) ) ;
}
logger . info ( "Update task definition complete, projectCode:{}, taskDefinitionCode:{}." , projectCode , taskCode ) ;
result . put ( Constants . DATA_LIST , taskCode ) ;
@ -428,10 +470,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result ;
}
private TaskDefinitionLog updateTask ( User loginUser , long projectCode , long taskCode , String taskDefinitionJsonObj , Map < String , Object > result ) {
private TaskDefinitionLog updateTask ( User loginUser , long projectCode , long taskCode , String taskDefinitionJsonObj ,
Map < String , Object > result ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
result . putAll ( projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION_UPDATE ) ) ;
// check user access for project
result . putAll ( projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION_UPDATE ) ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return null ;
}
@ -444,12 +487,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if ( processService . isTaskOnline ( taskCode ) & & taskDefinition . getFlag ( ) = = Flag . YES ) {
// if stream, can update task definition without online check
if ( taskDefinition . getTaskExecuteType ( ) ! = TaskExecuteType . STREAM ) {
logger . warn ( "Only {} type task can be updated without online check, taskDefinitionCode:{}." , TaskExecuteType . STREAM , taskCode ) ;
logger . warn ( "Only {} type task can be updated without online check, taskDefinitionCode:{}." ,
TaskExecuteType . STREAM , taskCode ) ;
putMsg ( result , Status . NOT_SUPPORT_UPDATE_TASK_DEFINITION ) ;
return null ;
}
}
TaskDefinitionLog taskDefinitionToUpdate = JSONUtils . parseObject ( taskDefinitionJsonObj , TaskDefinitionLog . class ) ;
TaskDefinitionLog taskDefinitionToUpdate =
JSONUtils . parseObject ( taskDefinitionJsonObj , TaskDefinitionLog . class ) ;
if ( taskDefinition . equals ( taskDefinitionToUpdate ) ) {
logger . warn ( "Task definition does not need update because no change, taskDefinitionCode:{}." , taskCode ) ;
return null ;
@ -464,13 +509,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
. taskParams ( taskDefinitionToUpdate . getTaskParams ( ) )
. dependence ( taskDefinitionToUpdate . getDependence ( ) )
. build ( ) ) ) {
logger . warn ( "Task definition parameters are invalid, taskDefinitionName:{}." , taskDefinitionToUpdate . getName ( ) ) ;
logger . warn ( "Task definition parameters are invalid, taskDefinitionName:{}." ,
taskDefinitionToUpdate . getName ( ) ) ;
putMsg ( result , Status . PROCESS_NODE_S_PARAMETER_INVALID , taskDefinitionToUpdate . getName ( ) ) ;
return null ;
}
Integer version = taskDefinitionLogMapper . queryMaxVersionForDefinition ( taskCode ) ;
if ( version = = null | | version = = 0 ) {
logger . error ( "Max version task definitionLog can not be found in database, taskDefinitionCode:{}." , taskCode ) ;
logger . error ( "Max version task definitionLog can not be found in database, taskDefinitionCode:{}." ,
taskCode ) ;
putMsg ( result , Status . DATA_IS_NOT_VALID , taskCode ) ;
return null ;
}
@ -490,11 +537,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskDefinitionToUpdate . setId ( null ) ;
int insert = taskDefinitionLogMapper . insert ( taskDefinitionToUpdate ) ;
if ( ( update & insert ) ! = 1 ) {
logger . error ( "Update task definition or definitionLog error, projectCode:{}, taskDefinitionCode:{}." , projectCode , taskCode ) ;
logger . error ( "Update task definition or definitionLog error, projectCode:{}, taskDefinitionCode:{}." ,
projectCode , taskCode ) ;
putMsg ( result , Status . UPDATE_TASK_DEFINITION_ERROR ) ;
throw new ServiceException ( Status . UPDATE_TASK_DEFINITION_ERROR ) ;
} else
logger . info ( "Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}." ,
logger . info (
"Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}." ,
projectCode , taskCode , taskDefinitionToUpdate . getVersion ( ) ) ;
return taskDefinitionToUpdate ;
}
@ -510,17 +559,22 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
* @return update result code
* /
@Override
public Map < String , Object > updateTaskWithUpstream ( User loginUser , long projectCode , long taskCode , String taskDefinitionJsonObj , String upstreamCodes ) {
public Map < String , Object > updateTaskWithUpstream ( User loginUser , long projectCode , long taskCode ,
String taskDefinitionJsonObj , String upstreamCodes ) {
Map < String , Object > result = new HashMap < > ( ) ;
TaskDefinitionLog taskDefinitionToUpdate = updateTask ( loginUser , projectCode , taskCode , taskDefinitionJsonObj , result ) ;
TaskDefinitionLog taskDefinitionToUpdate =
updateTask ( loginUser , projectCode , taskCode , taskDefinitionJsonObj , result ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS & & taskDefinitionToUpdate = = null ) {
return result ;
}
List < ProcessTaskRelation > upstreamTaskRelations = processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
Set < Long > upstreamCodeSet = upstreamTaskRelations . stream ( ) . map ( ProcessTaskRelation : : getPreTaskCode ) . collect ( Collectors . toSet ( ) ) ;
List < ProcessTaskRelation > upstreamTaskRelations =
processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
Set < Long > upstreamCodeSet =
upstreamTaskRelations . stream ( ) . map ( ProcessTaskRelation : : getPreTaskCode ) . collect ( Collectors . toSet ( ) ) ;
Set < Long > upstreamTaskCodes = Collections . emptySet ( ) ;
if ( StringUtils . isNotEmpty ( upstreamCodes ) ) {
upstreamTaskCodes = Arrays . stream ( upstreamCodes . split ( Constants . COMMA ) ) . map ( Long : : parseLong ) . collect ( Collectors . toSet ( ) ) ;
upstreamTaskCodes = Arrays . stream ( upstreamCodes . split ( Constants . COMMA ) ) . map ( Long : : parseLong )
. collect ( Collectors . toSet ( ) ) ;
}
if ( CollectionUtils . isEqualCollection ( upstreamCodeSet , upstreamTaskCodes ) & & taskDefinitionToUpdate = = null ) {
putMsg ( result , Status . SUCCESS ) ;
@ -533,12 +587,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
Map < Long , TaskDefinition > queryUpStreamTaskCodeMap ;
if ( ! upstreamTaskCodes . isEmpty ( ) ) {
List < TaskDefinition > upstreamTaskDefinitionList = taskDefinitionMapper . queryByCodeList ( upstreamTaskCodes ) ;
queryUpStreamTaskCodeMap = upstreamTaskDefinitionList . stream ( ) . collect ( Collectors . toMap ( TaskDefinition : : getCode , taskDefinition - > taskDefinition ) ) ;
queryUpStreamTaskCodeMap = upstreamTaskDefinitionList . stream ( )
. collect ( Collectors . toMap ( TaskDefinition : : getCode , taskDefinition - > taskDefinition ) ) ;
// upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
upstreamTaskCodes . removeAll ( queryUpStreamTaskCodeMap . keySet ( ) ) ;
if ( ! upstreamTaskCodes . isEmpty ( ) ) {
String notExistTaskCodes = StringUtils . join ( upstreamTaskCodes , Constants . COMMA ) ;
logger . error ( "Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}." , notExistTaskCodes ) ;
logger . error ( "Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}." ,
notExistTaskCodes ) ;
putMsg ( result , Status . TASK_DEFINE_NOT_EXIST , notExistTaskCodes ) ;
return result ;
}
@ -547,12 +603,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
if ( ! upstreamTaskRelations . isEmpty ( ) ) {
ProcessTaskRelation taskRelation = upstreamTaskRelations . get ( 0 ) ;
List < ProcessTaskRelation > processTaskRelations = processTaskRelationMapper . queryByProcessCode ( projectCode , taskRelation . getProcessDefinitionCode ( ) ) ;
List < ProcessTaskRelation > processTaskRelations =
processTaskRelationMapper . queryByProcessCode ( projectCode , taskRelation . getProcessDefinitionCode ( ) ) ;
List < ProcessTaskRelation > processTaskRelationList = Lists . newArrayList ( processTaskRelations ) ;
List < ProcessTaskRelation > relationList = Lists . newArrayList ( ) ;
for ( ProcessTaskRelation processTaskRelation : processTaskRelationList ) {
if ( processTaskRelation . getPostTaskCode ( ) = = taskCode ) {
if ( queryUpStreamTaskCodeMap . containsKey ( processTaskRelation . getPreTaskCode ( ) ) & & processTaskRelation . getPreTaskCode ( ) ! = 0L ) {
if ( queryUpStreamTaskCodeMap . containsKey ( processTaskRelation . getPreTaskCode ( ) )
& & processTaskRelation . getPreTaskCode ( ) ! = 0L ) {
queryUpStreamTaskCodeMap . remove ( processTaskRelation . getPreTaskCode ( ) ) ;
} else {
processTaskRelation . setPreTaskCode ( 0L ) ;
@ -570,16 +628,17 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if ( queryUpStreamTaskCodeMap . isEmpty ( ) & & ! processTaskRelationList . isEmpty ( ) ) {
processTaskRelationList . add ( processTaskRelationList . get ( 0 ) ) ;
}
updateDag ( loginUser , result , taskRelation . getProcessDefinitionCode ( ) , processTaskRelations , Lists . newArrayList ( taskDefinitionToUpdate ) ) ;
updateDag ( loginUser , result , taskRelation . getProcessDefinitionCode ( ) , processTaskRelations ,
Lists . newArrayList ( taskDefinitionToUpdate ) ) ;
}
logger . info ( "Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}." ,
logger . info (
"Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}." ,
projectCode , taskCode , upstreamTaskCodes ) ;
result . put ( Constants . DATA_LIST , taskCode ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
/ * *
* switch task definition
*
@ -592,13 +651,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Override
public Map < String , Object > switchVersion ( User loginUser , long projectCode , long taskCode , int version ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode , WORKFLOW_SWITCH_TO_THIS_VERSION ) ;
// check user access for project
Map < String , Object > result =
projectService . checkProjectAndAuth ( loginUser , project , projectCode , WORKFLOW_SWITCH_TO_THIS_VERSION ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
if ( processService . isTaskOnline ( taskCode ) ) {
logger . warn ( "Task definition version can not be switched due to process definition is {}, taskDefinitionCode:{}." ,
logger . warn (
"Task definition version can not be switched due to process definition is {}, taskDefinitionCode:{}." ,
ReleaseState . ONLINE . getDescp ( ) , taskCode ) ;
putMsg ( result , Status . PROCESS_DEFINE_STATE_ONLINE ) ;
return result ;
@ -609,20 +670,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg ( result , Status . TASK_DEFINE_NOT_EXIST , String . valueOf ( taskCode ) ) ;
return result ;
}
TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper . queryByDefinitionCodeAndVersion ( taskCode , version ) ;
TaskDefinitionLog taskDefinitionUpdate =
taskDefinitionLogMapper . queryByDefinitionCodeAndVersion ( taskCode , version ) ;
taskDefinitionUpdate . setUserId ( loginUser . getId ( ) ) ;
taskDefinitionUpdate . setUpdateTime ( new Date ( ) ) ;
taskDefinitionUpdate . setId ( taskDefinition . getId ( ) ) ;
int switchVersion = taskDefinitionMapper . updateById ( taskDefinitionUpdate ) ;
if ( switchVersion > 0 ) {
List < ProcessTaskRelation > taskRelationList = processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
List < ProcessTaskRelation > taskRelationList =
processTaskRelationMapper . queryUpstreamByCode ( projectCode , taskCode ) ;
if ( ! taskRelationList . isEmpty ( ) ) {
logger . info ( "Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}." , taskCode ) ;
logger . info (
"Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}." ,
taskCode ) ;
long processDefinitionCode = taskRelationList . get ( 0 ) . getProcessDefinitionCode ( ) ;
List < ProcessTaskRelation > processTaskRelations = processTaskRelationMapper . queryByProcessCode ( projectCode , processDefinitionCode ) ;
updateDag ( loginUser , result , processDefinitionCode , processTaskRelations , Lists . newArrayList ( taskDefinitionUpdate ) ) ;
List < ProcessTaskRelation > processTaskRelations =
processTaskRelationMapper . queryByProcessCode ( projectCode , processDefinitionCode ) ;
updateDag ( loginUser , result , processDefinitionCode , processTaskRelations ,
Lists . newArrayList ( taskDefinitionUpdate ) ) ;
} else {
logger . info ( "Task definition version switch complete, switch task version to {}, taskDefinitionCode:{}." , version , taskCode ) ;
logger . info (
"Task definition version switch complete, switch task version to {}, taskDefinitionCode:{}." ,
version , taskCode ) ;
putMsg ( result , Status . SUCCESS ) ;
}
} else {
@ -641,7 +710,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
Result result = new Result ( ) ;
Project project = projectMapper . queryByCode ( projectCode ) ;
// check user access for project
Map < String , Object > checkResult = projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_VERSION_VIEW ) ;
Map < String , Object > checkResult =
projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_VERSION_VIEW ) ;
Status resultStatus = ( Status ) checkResult . get ( Constants . STATUS ) ;
if ( resultStatus ! = Status . SUCCESS ) {
putMsg ( result , resultStatus ) ;
@ -649,7 +719,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
PageInfo < TaskDefinitionLog > pageInfo = new PageInfo < > ( pageNo , pageSize ) ;
Page < TaskDefinitionLog > page = new Page < > ( pageNo , pageSize ) ;
IPage < TaskDefinitionLog > taskDefinitionVersionsPaging = taskDefinitionLogMapper . queryTaskDefinitionVersionsPaging ( page , taskCode , projectCode ) ;
IPage < TaskDefinitionLog > taskDefinitionVersionsPaging =
taskDefinitionLogMapper . queryTaskDefinitionVersionsPaging ( page , taskCode , projectCode ) ;
List < TaskDefinitionLog > taskDefinitionLogs = taskDefinitionVersionsPaging . getRecords ( ) ;
pageInfo . setTotalList ( taskDefinitionLogs ) ;
@ -662,8 +733,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Override
public Map < String , Object > deleteByCodeAndVersion ( User loginUser , long projectCode , long taskCode , int version ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION_DELETE ) ;
// check user access for project
Map < String , Object > result =
projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION_DELETE ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
@ -674,14 +746,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg ( result , Status . TASK_DEFINE_NOT_EXIST , String . valueOf ( taskCode ) ) ;
} else {
if ( taskDefinition . getVersion ( ) = = version ) {
logger . warn ( "Task definition can not be deleted due to version is being used, projectCode:{}, taskDefinitionCode:{}, version:{}." ,
logger . warn (
"Task definition can not be deleted due to version is being used, projectCode:{}, taskDefinitionCode:{}, version:{}." ,
projectCode , taskCode , version ) ;
putMsg ( result , Status . MAIN_TABLE_USING_VERSION ) ;
return result ;
}
int delete = taskDefinitionLogMapper . deleteByCodeAndVersion ( taskCode , version ) ;
if ( delete > 0 ) {
logger . info ( "Task definition version delete complete, projectCode:{}, taskDefinitionCode:{}, version:{}." ,
logger . info (
"Task definition version delete complete, projectCode:{}, taskDefinitionCode:{}, version:{}." ,
projectCode , taskCode , version ) ;
putMsg ( result , Status . SUCCESS ) ;
} else {
@ -696,8 +770,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Override
public Map < String , Object > queryTaskDefinitionDetail ( User loginUser , long projectCode , long taskCode ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION ) ;
// check user access for project
Map < String , Object > result =
projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
@ -724,8 +799,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
Integer pageSize ) {
Result result = new Result ( ) ;
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > checkResult = projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION ) ;
// check user access for project
Map < String , Object > checkResult =
projectService . checkProjectAndAuth ( loginUser , project , projectCode , TASK_DEFINITION ) ;
Status resultStatus = ( Status ) checkResult . get ( Constants . STATUS ) ;
if ( resultStatus ! = Status . SUCCESS ) {
putMsg ( result , resultStatus ) ;
@ -733,8 +809,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
taskType = taskType = = null ? StringUtils . EMPTY : taskType ;
Page < TaskMainInfo > page = new Page < > ( pageNo , pageSize ) ;
IPage < TaskMainInfo > taskMainInfoIPage = taskDefinitionMapper . queryDefineListPaging ( page , projectCode , searchWorkflowName ,
searchTaskName , taskType , taskExecuteType ) ;
IPage < TaskMainInfo > taskMainInfoIPage =
taskDefinitionMapper . queryDefineListPaging ( page , projectCode , searchWorkflowName ,
searchTaskName , taskType , taskExecuteType ) ;
List < TaskMainInfo > records = taskMainInfoIPage . getRecords ( ) ;
if ( ! records . isEmpty ( ) ) {
Map < Long , TaskMainInfo > taskMainInfoMap = new HashMap < > ( ) ;
@ -800,10 +877,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
* /
@Transactional
@Override
public Map < String , Object > releaseTaskDefinition ( User loginUser , long projectCode , long code , ReleaseState releaseState ) {
public Map < String , Object > releaseTaskDefinition ( User loginUser , long projectCode , long code ,
ReleaseState releaseState ) {
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode , null ) ;
// check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode , null ) ;
Status resultStatus = ( Status ) result . get ( Constants . STATUS ) ;
if ( resultStatus ! = Status . SUCCESS ) {
return result ;
@ -817,7 +895,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg ( result , Status . TASK_DEFINE_NOT_EXIST , String . valueOf ( code ) ) ;
return result ;
}
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper . queryByDefinitionCodeAndVersion ( code , taskDefinition . getVersion ( ) ) ;
TaskDefinitionLog taskDefinitionLog =
taskDefinitionLogMapper . queryByDefinitionCodeAndVersion ( code , taskDefinition . getVersion ( ) ) ;
if ( taskDefinitionLog = = null ) {
logger . error ( "Task definition does not exist, taskDefinitionCode:{}." , code ) ;
putMsg ( result , Status . TASK_DEFINE_NOT_EXIST , String . valueOf ( code ) ) ;
@ -831,8 +910,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
case ONLINE :
String resourceIds = taskDefinition . getResourceIds ( ) ;
if ( StringUtils . isNotBlank ( resourceIds ) ) {
Integer [ ] resourceIdArray = Arrays . stream ( resourceIds . split ( "," ) ) . map ( Integer : : parseInt ) . toArray ( Integer [ ] : : new ) ;
PermissionCheck < Integer > permissionCheck = new PermissionCheck ( AuthorizationType . RESOURCE_FILE_ID , processService , resourceIdArray , loginUser . getId ( ) , logger ) ;
Integer [ ] resourceIdArray =
Arrays . stream ( resourceIds . split ( "," ) ) . map ( Integer : : parseInt ) . toArray ( Integer [ ] : : new ) ;
PermissionCheck < Integer > permissionCheck = new PermissionCheck ( AuthorizationType . RESOURCE_FILE_ID ,
processService , resourceIdArray , loginUser . getId ( ) , logger ) ;
try {
permissionCheck . checkPermission ( ) ;
} catch ( Exception e ) {
@ -856,7 +937,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg ( result , Status . UPDATE_TASK_DEFINITION_ERROR ) ;
throw new ServiceException ( Status . UPDATE_TASK_DEFINITION_ERROR ) ;
}
logger . error ( "Update taskDefinition state or taskDefinitionLog state to complete, taskDefinitionCode:{}." , code ) ;
logger . error ( "Update taskDefinition state or taskDefinitionLog state to complete, taskDefinitionCode:{}." ,
code ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
}