@ -17,11 +17,12 @@
package org.apache.dolphinscheduler.api.service ;
import com.alibaba.fastjson.JSON ;
import com.alibaba.fastjson.JSONArray ;
import com.alibaba.fastjson.JSONObject ;
import com.baomidou.mybatisplus.core.metadata.IPage ;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
import com.fasterxml.jackson.core.JsonProcessingException ;
import com.fasterxml.jackson.databind.JsonNode ;
import com.fasterxml.jackson.databind.node.ArrayNode ;
import com.fasterxml.jackson.databind.node.ObjectNode ;
import org.apache.dolphinscheduler.api.dto.ProcessMeta ;
import org.apache.dolphinscheduler.api.dto.treeview.Instance ;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto ;
@ -73,6 +74,11 @@ public class ProcessDefinitionService extends BaseDAGService {
private static final Logger logger = LoggerFactory . getLogger ( ProcessDefinitionService . class ) ;
private static final String PROCESSDEFINITIONID = "processDefinitionId" ;
private static final String RELEASESTATE = "releaseState" ;
private static final String TASKS = "tasks" ;
@Autowired
private ProjectMapper projectMapper ;
@ -99,13 +105,13 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* create process definition
*
* @param loginUser login user
* @param projectName project name
* @param name process definition name
* @param loginUser login user
* @param projectName project name
* @param name process definition name
* @param processDefinitionJson process definition json
* @param desc description
* @param locations locations for nodes
* @param connects connects for nodes
* @param desc description
* @param locations locations for nodes
* @param connects connects for nodes
* @return create result code
* @throws JsonProcessingException JsonProcessingException
* /
@ -159,30 +165,34 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine . setUpdateTime ( now ) ;
processDefine . setFlag ( Flag . YES ) ;
processDefineMapper . insert ( processDefine ) ;
// return processDefinition object with ID
result . put ( Constants . DATA_LIST , processDefineMapper . selectById ( processDefine . getId ( ) ) ) ;
putMsg ( result , Status . SUCCESS ) ;
result . put ( "processDefinitionId" , processDefine . getId ( ) ) ;
result . put ( "processDefinitionId" , processDefine . getId ( ) ) ;
return result ;
}
/ * *
* get resource ids
*
* @param processData process data
* @return resource ids
* /
private String getResourceIds ( ProcessData processData ) {
List < TaskNode > tasks = processData . getTasks ( ) ;
Set < Integer > resourceIds = new HashSet < > ( ) ;
for ( TaskNode taskNode : tasks ) {
for ( TaskNode taskNode : tasks ) {
String taskParameter = taskNode . getParams ( ) ;
AbstractParameters params = TaskParametersUtils . getParameters ( taskNode . getType ( ) , taskParameter ) ;
AbstractParameters params = TaskParametersUtils . getParameters ( taskNode . getType ( ) , taskParameter ) ;
if ( CollectionUtils . isNotEmpty ( params . getResourceFilesList ( ) ) ) {
Set < Integer > tempSet = params . getResourceFilesList ( ) . stream ( ) . map ( t - > t . getId ( ) ) . collect ( Collectors . toSet ( ) ) ;
Set < Integer > tempSet = params . getResourceFilesList ( ) . stream ( ) . map ( t - > t . getId ( ) ) . collect ( Collectors . toSet ( ) ) ;
resourceIds . addAll ( tempSet ) ;
}
}
StringBuilder sb = new StringBuilder ( ) ;
for ( int i : resourceIds ) {
for ( int i : resourceIds ) {
if ( sb . length ( ) > 0 ) {
sb . append ( "," ) ;
}
@ -195,7 +205,7 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* query process definition list
*
* @param loginUser login user
* @param loginUser login user
* @param projectName project name
* @return definition list
* /
@ -221,12 +231,12 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* query process definition list paging
*
* @param loginUser login user
* @param loginUser login user
* @param projectName project name
* @param searchVal search value
* @param pageNo page number
* @param pageSize page size
* @param userId user id
* @param searchVal search value
* @param pageNo page number
* @param pageSize page size
* @param userId user id
* @return process definition page
* /
public Map < String , Object > queryProcessDefinitionListPaging ( User loginUser , String projectName , String searchVal , Integer pageNo , Integer pageSize , Integer userId ) {
@ -242,10 +252,10 @@ public class ProcessDefinitionService extends BaseDAGService {
Page < ProcessDefinition > page = new Page ( pageNo , pageSize ) ;
IPage < ProcessDefinition > processDefinitionIPage = processDefineMapper . queryDefineListPaging (
page , searchVal , userId , project . getId ( ) , isAdmin ( loginUser ) ) ;
page , searchVal , userId , project . getId ( ) , isAdmin ( loginUser ) ) ;
PageInfo pageInfo = new PageInfo < ProcessData > ( pageNo , pageSize ) ;
pageInfo . setTotalCount ( ( int ) processDefinitionIPage . getTotal ( ) ) ;
pageInfo . setTotalCount ( ( int ) processDefinitionIPage . getTotal ( ) ) ;
pageInfo . setLists ( processDefinitionIPage . getRecords ( ) ) ;
result . put ( Constants . DATA_LIST , pageInfo ) ;
putMsg ( result , Status . SUCCESS ) ;
@ -256,9 +266,9 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* query datail of process definition
*
* @param loginUser login user
* @param loginUser login user
* @param projectName project name
* @param processId process definition id
* @param processId process definition id
* @return process definition detail
* /
public Map < String , Object > queryProcessDefinitionById ( User loginUser , String projectName , Integer processId ) {
@ -286,12 +296,12 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* copy process definition
*
* @param loginUser login user
* @param loginUser login user
* @param projectName project name
* @param processId process definition id
* @param processId process definition id
* @return copy result code
* /
public Map < String , Object > copyProcessDefinition ( User loginUser , String projectName , Integer processId ) throws JsonProcessingException {
public Map < String , Object > copyProcessDefinition ( User loginUser , String projectName , Integer processId ) throws JsonProcessingException {
Map < String , Object > result = new HashMap < > ( 5 ) ;
Project project = projectMapper . queryByName ( projectName ) ;
@ -310,7 +320,7 @@ public class ProcessDefinitionService extends BaseDAGService {
return createProcessDefinition (
loginUser ,
projectName ,
processDefinition . getName ( ) + "_copy_" + System . currentTimeMillis ( ) ,
processDefinition . getName ( ) + "_copy_" + System . currentTimeMillis ( ) ,
processDefinition . getProcessDefinitionJson ( ) ,
processDefinition . getDescription ( ) ,
processDefinition . getLocations ( ) ,
@ -321,14 +331,14 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* update process definition
*
* @param loginUser login user
* @param projectName project name
* @param name process definition name
* @param id process definition id
* @param loginUser login user
* @param projectName project name
* @param name process definition name
* @param id process definition id
* @param processDefinitionJson process definition json
* @param desc description
* @param locations locations for nodes
* @param connects connects for nodes
* @param desc description
* @param locations locations for nodes
* @param connects connects for nodes
* @return update result code
* /
public Map < String , Object > updateProcessDefinition ( User loginUser , String projectName , int id , String name ,
@ -397,9 +407,9 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* verify process definition name unique
*
* @param loginUser login user
* @param loginUser login user
* @param projectName project name
* @param name name
* @param name name
* @return true if process definition name not exists , otherwise false
* /
public Map < String , Object > verifyProcessDefinitionName ( User loginUser , String projectName , String name ) {
@ -424,8 +434,8 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* delete process definition by id
*
* @param loginUser login user
* @param projectName project name
* @param loginUser login user
* @param projectName project name
* @param processDefinitionId process definition id
* @return delete result code
* /
@ -456,22 +466,22 @@ public class ProcessDefinitionService extends BaseDAGService {
// check process definition is already online
if ( processDefinition . getReleaseState ( ) = = ReleaseState . ONLINE ) {
putMsg ( result , Status . PROCESS_DEFINE_STATE_ONLINE , processDefinitionId ) ;
putMsg ( result , Status . PROCESS_DEFINE_STATE_ONLINE , processDefinitionId ) ;
return result ;
}
// get the timing according to the process definition
List < Schedule > schedules = scheduleMapper . queryByProcessDefinitionId ( processDefinitionId ) ;
if ( ! schedules . isEmpty ( ) & & schedules . size ( ) > 1 ) {
logger . warn ( "scheduler num is {},Greater than 1" , schedules . size ( ) ) ;
logger . warn ( "scheduler num is {},Greater than 1" , schedules . size ( ) ) ;
putMsg ( result , Status . DELETE_PROCESS_DEFINE_BY_ID_ERROR ) ;
return result ;
} else if ( schedules . size ( ) = = 1 ) {
} else if ( schedules . size ( ) = = 1 ) {
Schedule schedule = schedules . get ( 0 ) ;
if ( schedule . getReleaseState ( ) = = ReleaseState . OFFLINE ) {
if ( schedule . getReleaseState ( ) = = ReleaseState . OFFLINE ) {
scheduleMapper . deleteById ( schedule . getId ( ) ) ;
} else if ( schedule . getReleaseState ( ) = = ReleaseState . ONLINE ) {
putMsg ( result , Status . SCHEDULE_CRON_STATE_ONLINE , schedule . getId ( ) ) ;
} else if ( schedule . getReleaseState ( ) = = ReleaseState . ONLINE ) {
putMsg ( result , Status . SCHEDULE_CRON_STATE_ONLINE , schedule . getId ( ) ) ;
return result ;
}
}
@ -489,9 +499,9 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* release process definition : online / offline
*
* @param loginUser login user
* @param projectName project name
* @param id process definition id
* @param loginUser login user
* @param projectName project name
* @param id process definition id
* @param releaseState release state
* @return release result code
* /
@ -510,7 +520,7 @@ public class ProcessDefinitionService extends BaseDAGService {
// check state
if ( null = = state ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , "releaseState" ) ;
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , RELEASESTATE ) ;
return result ;
}
@ -522,12 +532,12 @@ public class ProcessDefinitionService extends BaseDAGService {
String resourceIds = processDefinition . getResourceIds ( ) ;
if ( StringUtils . isNotBlank ( resourceIds ) ) {
Integer [ ] resourceIdArray = Arrays . stream ( resourceIds . split ( "," ) ) . map ( Integer : : parseInt ) . toArray ( Integer [ ] : : new ) ;
PermissionCheck < Integer > permissionCheck = new PermissionCheck ( AuthorizationType . RESOURCE_FILE_ID , processService , resourceIdArray , loginUser . getId ( ) , logger ) ;
PermissionCheck < Integer > permissionCheck = new PermissionCheck < > ( AuthorizationType . RESOURCE_FILE_ID , processService , resourceIdArray , loginUser . getId ( ) , logger ) ;
try {
permissionCheck . checkPermission ( ) ;
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
putMsg ( result , Status . RESOURCE_NOT_EXIST_OR_NO_PERMISSION , "releaseState" ) ;
logger . error ( e . getMessage ( ) , e ) ;
putMsg ( result , Status . RESOURCE_NOT_EXIST_OR_NO_PERMISSION , RELEASESTATE ) ;
return result ;
}
}
@ -542,7 +552,7 @@ public class ProcessDefinitionService extends BaseDAGService {
new int [ ] { processDefinition . getId ( ) }
) ;
for ( Schedule schedule : scheduleList ) {
for ( Schedule schedule : scheduleList ) {
logger . info ( "set schedule offline, project id: {}, schedule id: {}, process definition id: {}" , project . getId ( ) , schedule . getId ( ) , id ) ;
// set status
schedule . setReleaseState ( ReleaseState . OFFLINE ) ;
@ -551,7 +561,7 @@ public class ProcessDefinitionService extends BaseDAGService {
}
break ;
default :
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , "releaseState" ) ;
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , RELEASESTATE ) ;
return result ;
}
@ -561,14 +571,15 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* batch export process definition by ids
*
* @param loginUser
* @param projectName
* @param processDefinitionIds
* @param response
* /
public void batchExportProcessDefinitionByIds ( User loginUser , String projectName , String processDefinitionIds , HttpServletResponse response ) {
public void batchExportProcessDefinitionByIds ( User loginUser , String projectName , String processDefinitionIds , HttpServletResponse response ) {
if ( StringUtils . isEmpty ( processDefinitionIds ) ) {
if ( StringUtils . isEmpty ( processDefinitionIds ) ) {
return ;
}
@ -579,24 +590,25 @@ public class ProcessDefinitionService extends BaseDAGService {
Map < String , Object > checkResult = projectService . checkProjectAndAuth ( loginUser , project , projectName ) ;
Status resultStatus = ( Status ) checkResult . get ( Constants . STATUS ) ;
if ( resultStatus ! = Status . SUCCESS ) {
if ( resultStatus ! = Status . SUCCESS ) {
return ;
}
List < ProcessMeta > processDefinitionList =
getProcessDefinitionList ( processDefinitionIds ) ;
if ( CollectionUtils . isNotEmpty ( processDefinitionList ) ) {
if ( CollectionUtils . isNotEmpty ( processDefinitionList ) ) {
downloadProcessDefinitionFile ( response , processDefinitionList ) ;
}
}
/ * *
* get process definition list by ids
*
* @param processDefinitionIds
* @return
* /
private List < ProcessMeta > getProcessDefinitionList ( String processDefinitionIds ) {
private List < ProcessMeta > getProcessDefinitionList ( String processDefinitionIds ) {
List < ProcessMeta > processDefinitionList = new ArrayList < > ( ) ;
String [ ] processDefinitionIdArray = processDefinitionIds . split ( "," ) ;
for ( String strProcessDefinitionId : processDefinitionIdArray ) {
@ -613,6 +625,7 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* download the process definition file
*
* @param response
* @param processDefinitionList
* /
@ -628,7 +641,7 @@ public class ProcessDefinitionService extends BaseDAGService {
buff . close ( ) ;
} catch ( IOException e ) {
logger . warn ( "export process fail" , e ) ;
} finally {
} finally {
if ( null ! = buff ) {
try {
buff . close ( ) ;
@ -648,19 +661,21 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* get export process metadata string
*
* @param processDefinitionId process definition id
* @param processDefinition process definition
* @param processDefinition process definition
* @return export process metadata string
* /
public String exportProcessMetaDataStr ( Integer processDefinitionId , ProcessDefinition processDefinition ) {
//create workflow json file
return JSONUtils . toJsonString ( exportProcessMetaData ( processDefinitionId , processDefinition ) ) ;
return JSONUtils . toJsonString ( exportProcessMetaData ( processDefinitionId , processDefinition ) ) ;
}
/ * *
* get export process metadata string
*
* @param processDefinitionId process definition id
* @param processDefinition process definition
* @param processDefinition process definition
* @return export process metadata string
* /
public ProcessMeta exportProcessMetaData ( Integer processDefinitionId , ProcessDefinition processDefinition ) {
@ -696,17 +711,18 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* correct task param which has datasource or dependent
*
* @param processDefinitionJson processDefinitionJson
* @return correct processDefinitionJson
* /
public String addExportTaskNodeSpecialParam ( String processDefinitionJson ) {
JSON Object jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSON Array jsonArray = ( JSON Array) jsonObject . get ( "tasks" ) ;
ObjectNode jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
ArrayNode jsonArray = ( ArrayNode ) jsonObject . path ( TASKS ) ;
for ( int i = 0 ; i < jsonArray . size ( ) ; i + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( i ) ;
if ( StringUtils . isNotEmpty ( taskNode . getString ( "type" ) ) ) {
String taskType = taskNode . getString ( "type" ) ;
JsonNode taskNode = jsonArray . path ( i ) ;
if ( StringUtils . isNotEmpty ( taskNode . path ( "type" ) . asText ( ) ) ) {
String taskType = taskNode . path ( "type" ) . asText ( ) ;
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory . getByTaskType ( taskType ) ;
if ( null ! = addTaskParam ) {
@ -714,12 +730,13 @@ public class ProcessDefinitionService extends BaseDAGService {
}
}
}
jsonObject . put ( "tasks" , jsonArray ) ;
jsonObject . set ( TASKS , jsonArray ) ;
return jsonObject . toString ( ) ;
}
/ * *
* check task if has sub process
*
* @param taskType task type
* @return if task has sub process return true else false
* /
@ -729,8 +746,9 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* import process definition
* @param loginUser login user
* @param file process metadata json file
*
* @param loginUser login user
* @param file process metadata json file
* @param currentProjectName current project name
* @return import process
* /
@ -738,7 +756,7 @@ public class ProcessDefinitionService extends BaseDAGService {
public Map < String , Object > importProcessDefinition ( User loginUser , MultipartFile file , String currentProjectName ) {
Map < String , Object > result = new HashMap < > ( 5 ) ;
String processMetaJson = FileUtils . file2String ( file ) ;
List < ProcessMeta > processMetaList = JSON . parseArray ( processMetaJson , ProcessMeta . class ) ;
List < ProcessMeta > processMetaList = JSON . parseArray ( processMetaJson , ProcessMeta . class ) ;
//check file content
if ( CollectionUtils . isEmpty ( processMetaList ) ) {
@ -746,9 +764,9 @@ public class ProcessDefinitionService extends BaseDAGService {
return result ;
}
for ( ProcessMeta processMeta : processMetaList ) {
for ( ProcessMeta processMeta : processMetaList ) {
if ( ! checkAndImportProcessDefinition ( loginUser , currentProjectName , result , processMeta ) ) {
if ( ! checkAndImportProcessDefinition ( loginUser , currentProjectName , result , processMeta ) ) {
return result ;
}
}
@ -758,6 +776,7 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* check and import process definition
*
* @param loginUser
* @param currentProjectName
* @param result
@ -766,7 +785,7 @@ public class ProcessDefinitionService extends BaseDAGService {
* /
private boolean checkAndImportProcessDefinition ( User loginUser , String currentProjectName , Map < String , Object > result , ProcessMeta processMeta ) {
if ( ! checkImportanceParams ( processMeta , result ) ) {
if ( ! checkImportanceParams ( processMeta , result ) ) {
return false ;
}
@ -774,7 +793,7 @@ public class ProcessDefinitionService extends BaseDAGService {
String processDefinitionName = processMeta . getProcessDefinitionName ( ) ;
//use currentProjectName to query
Project targetProject = projectMapper . queryByName ( currentProjectName ) ;
if ( null ! = targetProject ) {
if ( null ! = targetProject ) {
processDefinitionName = recursionProcessDefinitionName ( targetProject . getId ( ) ,
processDefinitionName , 1 ) ;
}
@ -798,14 +817,14 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefinitionName ,
addImportTaskNodeParam ( loginUser , processMeta . getProcessDefinitionJson ( ) , targetProject ) ) ;
if ( createProcessResult = = null ) {
if ( createProcessResult = = null ) {
return false ;
}
//create process definition
Integer processDefinitionId =
Objects . isNull ( createProcessResult . get ( "processDefinitionId" ) ) ?
null : Integer . parseInt ( createProcessResult . get ( "processDefinitionId" ) . toString ( ) ) ;
Objects . isNull ( createProcessResult . get ( PROCESSDEFINITIONID ) ) ?
null : Integer . parseInt ( createProcessResult . get ( PROCESSDEFINITIONID ) . toString ( ) ) ;
//scheduler param
return getImportProcessScheduleResult ( loginUser ,
@ -819,6 +838,7 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* get create process result
*
* @param loginUser
* @param currentProjectName
* @param result
@ -832,12 +852,12 @@ public class ProcessDefinitionService extends BaseDAGService {
Map < String , Object > result ,
ProcessMeta processMeta ,
String processDefinitionName ,
String importProcessParam ) {
String importProcessParam ) {
Map < String , Object > createProcessResult = null ;
try {
createProcessResult = createProcessDefinition ( loginUser
, currentProjectName ,
processDefinitionName + "_import_" + System . currentTimeMillis ( ) ,
, currentProjectName ,
processDefinitionName + "_import_" + System . currentTimeMillis ( ) ,
importProcessParam ,
processMeta . getProcessDefinitionDescription ( ) ,
processMeta . getProcessDefinitionLocations ( ) ,
@ -853,6 +873,7 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* get import process schedule result
*
* @param loginUser
* @param currentProjectName
* @param result
@ -884,11 +905,12 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* check importance params
*
* @param processMeta
* @param result
* @return
* /
private boolean checkImportanceParams ( ProcessMeta processMeta , Map < String , Object > result ) {
private boolean checkImportanceParams ( ProcessMeta processMeta , Map < String , Object > result ) {
if ( StringUtils . isEmpty ( processMeta . getProjectName ( ) ) ) {
putMsg ( result , Status . DATA_IS_NULL , "projectName" ) ;
return false ;
@ -907,18 +929,19 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* import process add special task param
* @param loginUser login user
*
* @param loginUser login user
* @param processDefinitionJson process definition json
* @param targetProject target project
* @param targetProject target project
* @return import process param
* /
private String addImportTaskNodeParam ( User loginUser , String processDefinitionJson , Project targetProject ) {
JSON Object jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSON Array jsonArray = ( JSON Array) jsonObject . get ( "tasks" ) ;
ObjectNode jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
ArrayNode jsonArray = ( ArrayNode ) jsonObject . get ( TASKS ) ;
//add sql and dependent param
for ( int i = 0 ; i < jsonArray . size ( ) ; i + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( i ) ;
String taskType = taskNode . getString ( "type" ) ;
JsonNode taskNode = jsonArray . path ( i ) ;
String taskType = taskNode . path ( "type" ) . asText ( ) ;
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory . getByTaskType ( taskType ) ;
if ( null ! = addTaskParam ) {
addTaskParam . addImportSpecialParam ( taskNode ) ;
@ -928,25 +951,26 @@ public class ProcessDefinitionService extends BaseDAGService {
//recursive sub-process parameter correction map key for old process id value for new process id
Map < Integer , Integer > subProcessIdMap = new HashMap < > ( 20 ) ;
List < Object > subProcessList = jsonArray . stream ( )
. filter ( elem - > checkTaskHasSubProcess ( JSONUtils . parseObject ( elem . toString ( ) ) . getString ( "type" ) ) )
List < Object > subProcessList = StreamUtils . asStream ( jsonArray . elements ( ) )
. filter ( elem - > checkTaskHasSubProcess ( JSONUtils . parseObject ( elem . toString ( ) ) . path ( "type" ) . asText ( ) ) )
. collect ( Collectors . toList ( ) ) ;
if ( CollectionUtils . isNotEmpty ( subProcessList ) ) {
importSubProcess ( loginUser , targetProject , jsonArray , subProcessIdMap ) ;
}
jsonObject . put ( "tasks" , jsonArray ) ;
jsonObject . set ( TASKS , jsonArray ) ;
return jsonObject . toString ( ) ;
}
/ * *
* import process schedule
* @param loginUser login user
* @param currentProjectName current project name
* @param processMeta process meta data
*
* @param loginUser login user
* @param currentProjectName current project name
* @param processMeta process meta data
* @param processDefinitionName process definition name
* @param processDefinitionId process definition id
* @param processDefinitionId process definition id
* @return insert schedule flag
* /
public int importProcessSchedule ( User loginUser , String currentProjectName , ProcessMeta processMeta ,
@ -995,84 +1019,87 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* check import process has sub process
* recursion create sub process
* @param loginUser login user
* @param targetProject target project
* @param jsonArray process task array
*
* @param loginUser login user
* @param targetProject target project
* @param jsonArray process task array
* @param subProcessIdMap correct sub process id map
* /
public void importSubProcess ( User loginUser , Project targetProject , JSON Array jsonArray , Map < Integer , Integer > subProcessIdMap ) {
public void importSubProcess ( User loginUser , Project targetProject , ArrayNode jsonArray , Map < Integer , Integer > subProcessIdMap ) {
for ( int i = 0 ; i < jsonArray . size ( ) ; i + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( i ) ;
String taskType = taskNode . getString ( "type" ) ;
if ( checkTaskHasSubProcess ( taskType ) ) {
//get sub process info
JSONObject subParams = JSONUtils . parseObject ( taskNode . getString ( "params" ) ) ;
Integer subProcessId = subParams . getInteger ( "processDefinitionId" ) ;
ProcessDefinition subProcess = processDefineMapper . queryByDefineId ( subProcessId ) ;
//check is sub process exist in db
if ( null ! = subProcess ) {
String subProcessJson = subProcess . getProcessDefinitionJson ( ) ;
//check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefineMapper . queryByDefineName ( targetProject . getId ( ) , subProcess . getName ( ) ) ;
if ( null = = currentProjectSubProcess ) {
JSONArray subJsonArray = ( JSONArray ) JSONUtils . parseObject ( subProcess . getProcessDefinitionJson ( ) ) . get ( "tasks" ) ;
List < Object > subProcessList = subJsonArray . stream ( )
. filter ( item - > checkTaskHasSubProcess ( JSONUtils . parseObject ( item . toString ( ) ) . getString ( "type" ) ) )
. collect ( Collectors . toList ( ) ) ;
if ( CollectionUtils . isNotEmpty ( subProcessList ) ) {
importSubProcess ( loginUser , targetProject , subJsonArray , subProcessIdMap ) ;
//sub process processId correct
if ( ! subProcessIdMap . isEmpty ( ) ) {
for ( Map . Entry < Integer , Integer > entry : subProcessIdMap . entrySet ( ) ) {
String oldSubProcessId = "\"processDefinitionId\":" + entry . getKey ( ) ;
String newSubProcessId = "\"processDefinitionId\":" + entry . getValue ( ) ;
subProcessJson = subProcessJson . replaceAll ( oldSubProcessId , newSubProcessId ) ;
}
subProcessIdMap . clear ( ) ;
}
}
ObjectNode taskNode = ( ObjectNode ) jsonArray . path ( i ) ;
String taskType = taskNode . path ( "type" ) . asText ( ) ;
//if sub-process recursion
Date now = new Date ( ) ;
//create sub process in target project
ProcessDefinition processDefine = new ProcessDefinition ( ) ;
processDefine . setName ( subProcess . getName ( ) ) ;
processDefine . setVersion ( subProcess . getVersion ( ) ) ;
processDefine . setReleaseState ( subProcess . getReleaseState ( ) ) ;
processDefine . setProjectId ( targetProject . getId ( ) ) ;
processDefine . setUserId ( loginUser . getId ( ) ) ;
processDefine . setProcessDefinitionJson ( subProcessJson ) ;
processDefine . setDescription ( subProcess . getDescription ( ) ) ;
processDefine . setLocations ( subProcess . getLocations ( ) ) ;
processDefine . setConnects ( subProcess . getConnects ( ) ) ;
processDefine . setTimeout ( subProcess . getTimeout ( ) ) ;
processDefine . setTenantId ( subProcess . getTenantId ( ) ) ;
processDefine . setGlobalParams ( subProcess . getGlobalParams ( ) ) ;
processDefine . setCreateTime ( now ) ;
processDefine . setUpdateTime ( now ) ;
processDefine . setFlag ( subProcess . getFlag ( ) ) ;
processDefine . setReceivers ( subProcess . getReceivers ( ) ) ;
processDefine . setReceiversCc ( subProcess . getReceiversCc ( ) ) ;
processDefineMapper . insert ( processDefine ) ;
logger . info ( "create sub process, project: {}, process name: {}" , targetProject . getName ( ) , processDefine . getName ( ) ) ;
//modify task node
ProcessDefinition newSubProcessDefine = processDefineMapper . queryByDefineName ( processDefine . getProjectId ( ) , processDefine . getName ( ) ) ;
if ( null ! = newSubProcessDefine ) {
subProcessIdMap . put ( subProcessId , newSubProcessDefine . getId ( ) ) ;
subParams . put ( "processDefinitionId" , newSubProcessDefine . getId ( ) ) ;
taskNode . put ( "params" , subParams ) ;
if ( ! checkTaskHasSubProcess ( taskType ) ) {
continue ;
}
//get sub process info
ObjectNode subParams = ( ObjectNode ) taskNode . path ( "params" ) ;
Integer subProcessId = subParams . path ( PROCESSDEFINITIONID ) . asInt ( ) ;
ProcessDefinition subProcess = processDefineMapper . queryByDefineId ( subProcessId ) ;
//check is sub process exist in db
if ( null = = subProcess ) {
continue ;
}
String subProcessJson = subProcess . getProcessDefinitionJson ( ) ;
//check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefineMapper . queryByDefineName ( targetProject . getId ( ) , subProcess . getName ( ) ) ;
if ( null = = currentProjectSubProcess ) {
ArrayNode subJsonArray = ( ArrayNode ) JSONUtils . parseObject ( subProcess . getProcessDefinitionJson ( ) ) . get ( TASKS ) ;
List < Object > subProcessList = StreamUtils . asStream ( subJsonArray . elements ( ) )
. filter ( item - > checkTaskHasSubProcess ( JSONUtils . parseObject ( item . toString ( ) ) . path ( "type" ) . asText ( ) ) )
. collect ( Collectors . toList ( ) ) ;
if ( CollectionUtils . isNotEmpty ( subProcessList ) ) {
importSubProcess ( loginUser , targetProject , subJsonArray , subProcessIdMap ) ;
//sub process processId correct
if ( ! subProcessIdMap . isEmpty ( ) ) {
for ( Map . Entry < Integer , Integer > entry : subProcessIdMap . entrySet ( ) ) {
String oldSubProcessId = "\"processDefinitionId\":" + entry . getKey ( ) ;
String newSubProcessId = "\"processDefinitionId\":" + entry . getValue ( ) ;
subProcessJson = subProcessJson . replaceAll ( oldSubProcessId , newSubProcessId ) ;
}
subProcessIdMap . clear ( ) ;
}
}
//if sub-process recursion
Date now = new Date ( ) ;
//create sub process in target project
ProcessDefinition processDefine = new ProcessDefinition ( ) ;
processDefine . setName ( subProcess . getName ( ) ) ;
processDefine . setVersion ( subProcess . getVersion ( ) ) ;
processDefine . setReleaseState ( subProcess . getReleaseState ( ) ) ;
processDefine . setProjectId ( targetProject . getId ( ) ) ;
processDefine . setUserId ( loginUser . getId ( ) ) ;
processDefine . setProcessDefinitionJson ( subProcessJson ) ;
processDefine . setDescription ( subProcess . getDescription ( ) ) ;
processDefine . setLocations ( subProcess . getLocations ( ) ) ;
processDefine . setConnects ( subProcess . getConnects ( ) ) ;
processDefine . setTimeout ( subProcess . getTimeout ( ) ) ;
processDefine . setTenantId ( subProcess . getTenantId ( ) ) ;
processDefine . setGlobalParams ( subProcess . getGlobalParams ( ) ) ;
processDefine . setCreateTime ( now ) ;
processDefine . setUpdateTime ( now ) ;
processDefine . setFlag ( subProcess . getFlag ( ) ) ;
processDefine . setReceivers ( subProcess . getReceivers ( ) ) ;
processDefine . setReceiversCc ( subProcess . getReceiversCc ( ) ) ;
processDefineMapper . insert ( processDefine ) ;
logger . info ( "create sub process, project: {}, process name: {}" , targetProject . getName ( ) , processDefine . getName ( ) ) ;
//modify task node
ProcessDefinition newSubProcessDefine = processDefineMapper . queryByDefineName ( processDefine . getProjectId ( ) , processDefine . getName ( ) ) ;
if ( null ! = newSubProcessDefine ) {
subProcessIdMap . put ( subProcessId , newSubProcessDefine . getId ( ) ) ;
subParams . put ( PROCESSDEFINITIONID , newSubProcessDefine . getId ( ) ) ;
taskNode . set ( "params" , subParams ) ;
}
}
}
}
@ -1081,7 +1108,7 @@ public class ProcessDefinitionService extends BaseDAGService {
/ * *
* check the process definition node meets the specifications
*
* @param processData process data
* @param processData process data
* @param processDefinitionJson process definition json
* @return check result code
* /
@ -1091,7 +1118,7 @@ public class ProcessDefinitionService extends BaseDAGService {
try {
if ( processData = = null ) {
logger . error ( "process data is null" ) ;
putMsg ( result , Status . DATA_IS_NOT_VALID , processDefinitionJson ) ;
putMsg ( result , Status . DATA_IS_NOT_VALID , processDefinitionJson ) ;
return result ;
}
@ -1122,7 +1149,7 @@ public class ProcessDefinitionService extends BaseDAGService {
// check extra params
CheckUtils . checkOtherParams ( taskNode . getExtras ( ) ) ;
}
putMsg ( result , Status . SUCCESS ) ;
putMsg ( result , Status . SUCCESS ) ;
} catch ( Exception e ) {
result . put ( Constants . STATUS , Status . REQUEST_PARAMS_NOT_VALID_ERROR ) ;
result . put ( Constants . MSG , e . getMessage ( ) ) ;
@ -1135,9 +1162,8 @@ public class ProcessDefinitionService extends BaseDAGService {
*
* @param defineId define id
* @return task node list
* @throws Exception exception
* /
public Map < String , Object > getTaskNodeListByDefinitionId ( Integer defineId ) throws Exception {
public Map < String , Object > getTaskNodeListByDefinitionId ( Integer defineId ) {
Map < String , Object > result = new HashMap < > ( ) ;
ProcessDefinition processDefinition = processDefineMapper . selectById ( defineId ) ;
@ -1155,7 +1181,7 @@ public class ProcessDefinitionService extends BaseDAGService {
//process data check
if ( null = = processData ) {
logger . error ( "process data is null" ) ;
putMsg ( result , Status . DATA_IS_NOT_VALID , processDefinitionJson ) ;
putMsg ( result , Status . DATA_IS_NOT_VALID , processDefinitionJson ) ;
return result ;
}
@ -1173,15 +1199,14 @@ public class ProcessDefinitionService extends BaseDAGService {
*
* @param defineIdList define id list
* @return task node list
* @throws Exception exception
* /
public Map < String , Object > getTaskNodeListByDefinitionIdList ( String defineIdList ) throws Exception {
public Map < String , Object > getTaskNodeListByDefinitionIdList ( String defineIdList ) {
Map < String , Object > result = new HashMap < > ( ) ;
Map < Integer , List < TaskNode > > taskNodeMap = new HashMap < > ( ) ;
String [ ] idList = defineIdList . split ( "," ) ;
List < Integer > idIntList = new ArrayList < > ( ) ;
for ( String definitionId : idList ) {
for ( String definitionId : idList ) {
idIntList . add ( Integer . parseInt ( definitionId ) ) ;
}
Integer [ ] idArray = idIntList . toArray ( new Integer [ idIntList . size ( ) ] ) ;
@ -1192,7 +1217,7 @@ public class ProcessDefinitionService extends BaseDAGService {
return result ;
}
for ( ProcessDefinition processDefinition : processDefinitionList ) {
for ( ProcessDefinition processDefinition : processDefinitionList ) {
String processDefinitionJson = processDefinition . getProcessDefinitionJson ( ) ;
ProcessData processData = JSONUtils . parseObject ( processDefinitionJson , ProcessData . class ) ;
List < TaskNode > taskNodeList = ( processData . getTasks ( ) = = null ) ? new ArrayList < > ( ) : processData . getTasks ( ) ;
@ -1228,7 +1253,7 @@ public class ProcessDefinitionService extends BaseDAGService {
* Encapsulates the TreeView structure
*
* @param processId process definition id
* @param limit limit
* @param limit limit
* @return tree view json data
* @throws Exception exception
* /
@ -1238,7 +1263,7 @@ public class ProcessDefinitionService extends BaseDAGService {
ProcessDefinition processDefinition = processDefineMapper . selectById ( processId ) ;
if ( null = = processDefinition ) {
logger . info ( "process define not exists" ) ;
putMsg ( result , Status . PROCESS_DEFINE_NOT_EXIST , processDefinition ) ;
putMsg ( result , Status . PROCESS_DEFINE_NOT_EXIST , processDefinition ) ;
return result ;
}
DAG < String , TaskNode , TaskNodeRelation > dag = genDagGraph ( processDefinition ) ;
@ -1257,8 +1282,8 @@ public class ProcessDefinitionService extends BaseDAGService {
* /
List < ProcessInstance > processInstanceList = processInstanceMapper . queryByProcessDefineId ( processId , limit ) ;
for ( ProcessInstance processInstance : processInstanceList ) {
processInstance . setDuration ( DateUtils . differSec ( processInstance . getStartTime ( ) , processInstance . getEndTime ( ) ) ) ;
for ( ProcessInstance processInstance : processInstanceList ) {
processInstance . setDuration ( DateUtils . differSec ( processInstance . getStartTime ( ) , processInstance . getEndTime ( ) ) ) ;
}
if ( limit > processInstanceList . size ( ) ) {
@ -1361,9 +1386,8 @@ public class ProcessDefinitionService extends BaseDAGService {
*
* @param processDefinition process definition
* @return dag graph
* @throws Exception if exception happens
* /
private DAG < String , TaskNode , TaskNodeRelation > genDagGraph ( ProcessDefinition processDefinition ) throws Exception {
private DAG < String , TaskNode , TaskNodeRelation > genDagGraph ( ProcessDefinition processDefinition ) {
String processDefinitionJson = processDefinition . getProcessDefinitionJson ( ) ;
@ -1383,8 +1407,6 @@ public class ProcessDefinitionService extends BaseDAGService {
}
/ * *
* whether the graph has a ring
*
@ -1402,7 +1424,7 @@ public class ProcessDefinitionService extends BaseDAGService {
// Fill edge relations
for ( TaskNode taskNodeResponse : taskNodeResponseList ) {
taskNodeResponse . getPreTasks ( ) ;
List < String > preTasks = JSONUtils . toList ( taskNodeResponse . getPreTasks ( ) , String . class ) ;
List < String > preTasks = JSONUtils . toList ( taskNodeResponse . getPreTasks ( ) , String . class ) ;
if ( CollectionUtils . isNotEmpty ( preTasks ) ) {
for ( String preTask : preTasks ) {
if ( ! graph . addEdge ( preTask , taskNodeResponse . getName ( ) ) ) {
@ -1415,19 +1437,19 @@ public class ProcessDefinitionService extends BaseDAGService {
return graph . hasCycle ( ) ;
}
private String recursionProcessDefinitionName ( Integer projectId , String processDefinitionName , int num ) {
private String recursionProcessDefinitionName ( Integer projectId , String processDefinitionName , int num ) {
ProcessDefinition processDefinition = processDefineMapper . queryByDefineName ( projectId , processDefinitionName ) ;
if ( processDefinition ! = null ) {
if ( num > 1 ) {
String str = processDefinitionName . substring ( 0 , processDefinitionName . length ( ) - 3 ) ;
processDefinitionName = str + "(" + num + ")" ;
} else {
processDefinitionName = processDefinition . getName ( ) + "(" + num + ")" ;
if ( num > 1 ) {
String str = processDefinitionName . substring ( 0 , processDefinitionName . length ( ) - 3 ) ;
processDefinitionName = str + "(" + num + ")" ;
} else {
processDefinitionName = processDefinition . getName ( ) + "(" + num + ")" ;
}
} else {
} else {
return processDefinitionName ;
}
return recursionProcessDefinitionName ( projectId , processDefinitionName , num + 1 ) ;
return recursionProcessDefinitionName ( projectId , processDefinitionName , num + 1 ) ;
}
}