@ -16,9 +16,16 @@
* /
package org.apache.dolphinscheduler.api.service ;
import com.alibaba.fastjson.JSON ;
import com.alibaba.fastjson.JSONArray ;
import com.alibaba.fastjson.JSONObject ;
import com.baomidou.mybatisplus.core.metadata.IPage ;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
import com.fasterxml.jackson.core.JsonProcessingException ;
import org.apache.dolphinscheduler.api.dto.treeview.Instance ;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.utils.CheckUtils ;
import org.apache.dolphinscheduler.api.utils.PageInfo ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.* ;
@ -31,15 +38,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils ;
import org.apache.dolphinscheduler.common.utils.DateUtils ;
import org.apache.dolphinscheduler.common.utils.JSONUtils ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.dao.ProcessDao ;
import com.alibaba.fastjson.JSON ;
import com.alibaba.fastjson.JSONArray ;
import com.alibaba.fastjson.JSONObject ;
import com.baomidou.mybatisplus.core.metadata.IPage ;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
import com.fasterxml.jackson.core.JsonProcessingException ;
import org.apache.commons.lang3.ObjectUtils ;
import org.apache.dolphinscheduler.api.utils.CheckUtils ;
import org.apache.dolphinscheduler.dao.entity.* ;
import org.apache.dolphinscheduler.dao.mapper.* ;
import org.slf4j.Logger ;
@ -56,8 +56,10 @@ import java.io.BufferedOutputStream;
import java.io.BufferedReader ;
import java.io.IOException ;
import java.io.InputStreamReader ;
import java.nio.charset.StandardCharsets ;
import java.util.* ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.stream.Collectors ;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID ;
@ -482,50 +484,21 @@ public class ProcessDefinitionService extends BaseDAGService {
* @param response response
* /
public void exportProcessDefinitionById ( User loginUser , String projectName , Integer processDefinitionId , HttpServletResponse response ) {
//export project info
Project project = projectMapper . queryByName ( projectName ) ;
//check user access for project
Map < String , Object > checkResult = projectService . checkProjectAndAuth ( loginUser , project , projectName ) ;
Status resultStatus = ( Status ) checkResult . get ( Constants . STATUS ) ;
if ( resultStatus = = Status . SUCCESS ) {
//get workflow info
ProcessDefinition processDefinition = processDefineMapper . queryByDefineId ( processDefinitionId ) ;
if ( processDefinition ! = null ) {
JSONObject jsonObject = JSONUtils . parseObject ( processDefinition . getProcessDefinitionJson ( ) ) ;
JSONArray jsonArray = ( JSONArray ) jsonObject . get ( "tasks" ) ;
for ( int i = 0 ; i < jsonArray . size ( ) ; i + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( i ) ;
if ( taskNode . get ( "type" ) ! = null & & taskNode . get ( "type" ) ! = "" ) {
String taskType = taskNode . getString ( "type" ) ;
if ( taskType . equals ( TaskType . SQL . name ( ) ) | | taskType . equals ( TaskType . PROCEDURE . name ( ) ) ) {
JSONObject sqlParameters = JSONUtils . parseObject ( taskNode . getString ( "params" ) ) ;
DataSource dataSource = dataSourceMapper . selectById ( ( Integer ) sqlParameters . get ( "datasource" ) ) ;
if ( dataSource ! = null ) {
sqlParameters . put ( "datasourceName" , dataSource . getName ( ) ) ;
}
taskNode . put ( "params" , sqlParameters ) ;
} else if ( taskType . equals ( TaskType . DEPENDENT . name ( ) ) ) {
JSONObject dependentParameters = JSONUtils . parseObject ( taskNode . getString ( "dependence" ) ) ;
if ( dependentParameters ! = null ) {
JSONArray dependTaskList = ( JSONArray ) dependentParameters . get ( "dependTaskList" ) ;
for ( int j = 0 ; j < dependTaskList . size ( ) ; j + + ) {
JSONObject dependentTaskModel = dependTaskList . getJSONObject ( j ) ;
JSONArray dependItemList = ( JSONArray ) dependentTaskModel . get ( "dependItemList" ) ;
for ( int k = 0 ; k < dependItemList . size ( ) ; k + + ) {
JSONObject dependentItem = dependItemList . getJSONObject ( k ) ;
int definitionId = dependentItem . getInteger ( "definitionId" ) ;
ProcessDefinition definition = processDefineMapper . queryByDefineId ( definitionId ) ;
if ( definition ! = null ) {
dependentItem . put ( "projectName" , definition . getProjectName ( ) ) ;
dependentItem . put ( "definitionName" , definition . getName ( ) ) ;
}
}
}
taskNode . put ( "dependence" , dependentParameters ) ;
}
}
}
}
jsonObject . put ( "tasks" , jsonArray ) ;
processDefinition . setProcessDefinitionJson ( jsonObject . toString ( ) ) ;
if ( null ! = processDefinition ) {
//correct task param which has data source or dependent param
String correctProcessDefinitionJson = addTaskNodeSpecialParam ( processDefinition . getProcessDefinitionJson ( ) ) ;
processDefinition . setProcessDefinitionJson ( correctProcessDefinitionJson ) ;
Map < String , Object > row = new LinkedHashMap < > ( ) ;
row . put ( "projectName" , processDefinition . getProjectName ( ) ) ;
@ -535,8 +508,9 @@ public class ProcessDefinitionService extends BaseDAGService {
row . put ( "processDefinitionLocations" , processDefinition . getLocations ( ) ) ;
row . put ( "processDefinitionConnects" , processDefinition . getConnects ( ) ) ;
//schedule info
List < Schedule > schedules = scheduleMapper . queryByProcessDefinitionId ( processDefinitionId ) ;
if ( schedules . size ( ) > 0 ) {
if ( ! schedules . isEmpty ( ) ) {
Schedule schedule = schedules . get ( 0 ) ;
row . put ( "scheduleWarningType" , schedule . getWarningType ( ) ) ;
row . put ( "scheduleWarningGroupId" , schedule . getWarningGroupId ( ) ) ;
@ -556,6 +530,8 @@ public class ProcessDefinitionService extends BaseDAGService {
}
}
//create workflow json file
String rowsJson = JSONUtils . toJsonString ( row ) ;
response . setContentType ( MediaType . APPLICATION_JSON_UTF8_VALUE ) ;
response . setHeader ( "Content-Disposition" , "attachment;filename=" + processDefinition . getName ( ) + ".json" ) ;
@ -564,38 +540,136 @@ public class ProcessDefinitionService extends BaseDAGService {
try {
out = response . getOutputStream ( ) ;
buff = new BufferedOutputStream ( out ) ;
buff . write ( rowsJson . getBytes ( "UTF-8" ) ) ;
buff . write ( rowsJson . getBytes ( StandardCharsets . UTF_8 ) ) ;
buff . flush ( ) ;
buff . close ( ) ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
logger . warn ( "export process fail" , e ) ;
} finally {
try {
buff . close ( ) ;
out . close ( ) ;
} catch ( Exception e ) {
e . printStackTrace ( ) ;
if ( null ! = buff ) {
try {
buff . close ( ) ;
} catch ( Exception e ) {
logger . warn ( "export process buffer not close" , e ) ;
}
}
if ( null ! = out ) {
try {
out . close ( ) ;
} catch ( Exception e ) {
logger . warn ( "export process output stream not close" , e ) ;
}
}
}
}
}
}
/ * *
* correct task param which has datasource or dependent
* @param processDefinitionJson processDefinitionJson
* @return correct processDefinitionJson
* /
public String addTaskNodeSpecialParam ( String processDefinitionJson ) {
JSONObject jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSONArray jsonArray = ( JSONArray ) jsonObject . get ( "tasks" ) ;
for ( int i = 0 ; i < jsonArray . size ( ) ; i + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( i ) ;
if ( StringUtils . isNotEmpty ( taskNode . getString ( "type" ) ) ) {
String taskType = taskNode . getString ( "type" ) ;
if ( checkTaskHasDataSource ( taskType ) ) {
// add sqlParameters
JSONObject sqlParameters = JSONUtils . parseObject ( taskNode . getString ( "params" ) ) ;
DataSource dataSource = dataSourceMapper . selectById ( ( Integer ) sqlParameters . get ( "datasource" ) ) ;
if ( null ! = dataSource ) {
sqlParameters . put ( "datasourceName" , dataSource . getName ( ) ) ;
}
taskNode . put ( "params" , sqlParameters ) ;
} else if ( checkTaskHasDependent ( taskType ) ) {
// add dependent param
JSONObject dependentParameters = JSONUtils . parseObject ( taskNode . getString ( "dependence" ) ) ;
if ( null ! = dependentParameters ) {
JSONArray dependTaskList = ( JSONArray ) dependentParameters . get ( "dependTaskList" ) ;
for ( int j = 0 ; j < dependTaskList . size ( ) ; j + + ) {
JSONObject dependentTaskModel = dependTaskList . getJSONObject ( j ) ;
JSONArray dependItemList = ( JSONArray ) dependentTaskModel . get ( "dependItemList" ) ;
for ( int k = 0 ; k < dependItemList . size ( ) ; k + + ) {
JSONObject dependentItem = dependItemList . getJSONObject ( k ) ;
int definitionId = dependentItem . getInteger ( "definitionId" ) ;
ProcessDefinition definition = processDefineMapper . queryByDefineId ( definitionId ) ;
if ( null ! = definition ) {
dependentItem . put ( "projectName" , definition . getProjectName ( ) ) ;
dependentItem . put ( "definitionName" , definition . getName ( ) ) ;
}
}
}
taskNode . put ( "dependence" , dependentParameters ) ;
}
}
}
}
jsonObject . put ( "tasks" , jsonArray ) ;
return jsonObject . toString ( ) ;
}
/ * *
* check task if has dependent
* @param taskType task type
* @return if task has dependent return true else false
* /
private boolean checkTaskHasDependent ( String taskType ) {
return taskType . equals ( TaskType . DEPENDENT . name ( ) ) ;
}
/ * *
* check task if has data source info
* @param taskType task type
* @return if task has data source return true else false
* /
private boolean checkTaskHasDataSource ( String taskType ) {
return taskType . equals ( TaskType . SQL . name ( ) ) | | taskType . equals ( TaskType . PROCEDURE . name ( ) ) ;
}
/ * *
* check task if has sub process
* @param taskType task type
* @return if task has sub process return true else false
* /
private boolean checkTaskHasSubProcess ( String taskType ) {
return taskType . equals ( TaskType . SUB_PROCESS . name ( ) ) ;
}
/ * *
* import process definition
* @param loginUser login user
* @param file process metadata json file
* @param currentProjectName current project name
* @return
* /
@Transactional ( rollbackFor = Exception . class )
public Map < String , Object > importProcessDefinition ( User loginUser , MultipartFile file ) {
public Map < String , Object > importProcessDefinition ( User loginUser , MultipartFile file , String currentProjectName ) {
Map < String , Object > result = new HashMap < > ( 5 ) ;
JSONObject json = null ;
try ( InputStreamReader inputStreamReader = new InputStreamReader ( file . getInputStream ( ) , "UTF-8" ) ) {
JSONObject json ;
//read workflow json
try ( InputStreamReader inputStreamReader = new InputStreamReader ( file . getInputStream ( ) , StandardCharsets . UTF_8 ) ) {
BufferedReader streamReader = new BufferedReader ( inputStreamReader ) ;
StringBuilder respomseStrBuilder = new StringBuilder ( ) ;
String inputStr = "" ;
String inputStr ;
while ( ( inputStr = streamReader . readLine ( ) ) ! = null ) {
respomseStrBuilder . append ( inputStr ) ;
}
json = JSONObject . parseObject ( respomseStrBuilder . toString ( ) ) ;
if ( json ! = null ) {
String projectName = null ;
if ( null ! = json ) {
String originProjectName = null ;
String processDefinitionName = null ;
String processDefinitionJson = null ;
String processDefinitionDesc = null ;
@ -613,151 +687,254 @@ public class ProcessDefinitionService extends BaseDAGService {
String scheduleWorkerGroupId = null ;
String scheduleWorkerGroupName = null ;
if ( ObjectUtils . allNot Null ( json . get ( "projectName" ) ) ) {
p rojectName = json . get ( "projectName" ) . toString ( ) ;
if ( Objects . non Null ( json . get ( "projectName" ) ) ) {
originP rojectName = json . get ( "projectName" ) . toString ( ) ;
} else {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionName" ) ;
return result ;
}
if ( ObjectUtils . allNot Null ( json . get ( "processDefinitionName" ) ) ) {
if ( Objects . non Null ( json . get ( "processDefinitionName" ) ) ) {
processDefinitionName = json . get ( "processDefinitionName" ) . toString ( ) ;
} else {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionName" ) ;
return result ;
}
if ( ObjectUtils . allNot Null ( json . get ( "processDefinitionJson" ) ) ) {
if ( Objects . non Null ( json . get ( "processDefinitionJson" ) ) ) {
processDefinitionJson = json . get ( "processDefinitionJson" ) . toString ( ) ;
} else {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionJson" ) ;
return result ;
}
if ( ObjectUtils . allNot Null ( json . get ( "processDefinitionDescription" ) ) ) {
if ( Objects . non Null ( json . get ( "processDefinitionDescription" ) ) ) {
processDefinitionDesc = json . get ( "processDefinitionDescription" ) . toString ( ) ;
}
if ( ObjectUtils . allNot Null ( json . get ( "processDefinitionLocations" ) ) ) {
if ( Objects . non Null ( json . get ( "processDefinitionLocations" ) ) ) {
processDefinitionLocations = json . get ( "processDefinitionLocations" ) . toString ( ) ;
}
if ( ObjectUtils . allNot Null ( json . get ( "processDefinitionConnects" ) ) ) {
if ( Objects . non Null ( json . get ( "processDefinitionConnects" ) ) ) {
processDefinitionConnects = json . get ( "processDefinitionConnects" ) . toString ( ) ;
}
Project project = projectMapper . queryByName ( projectName ) ;
if ( project ! = null ) {
processDefinitionName = recursionProcessDefinitionName ( project . getId ( ) , processDefinitionName , 1 ) ;
}
//check user access for org project
Project originProject = projectMapper . queryByName ( originProjectName ) ;
Map < String , Object > checkResult = projectService . checkProjectAndAuth ( loginUser , originProject , originProjectName ) ;
Status resultStatus = ( Status ) checkResult . get ( Constants . STATUS ) ;
JSONObject jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSONArray jsonArray = ( JSONArray ) jsonObject . get ( "tasks" ) ;
for ( int j = 0 ; j < jsonArray . size ( ) ; j + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( j ) ;
String taskType = taskNode . getString ( "type" ) ;
if ( taskType . equals ( TaskType . SQL . name ( ) ) | | taskType . equals ( TaskType . PROCEDURE . name ( ) ) ) {
JSONObject sqlParameters = JSONUtils . parseObject ( taskNode . getString ( "params" ) ) ;
List < DataSource > dataSources = dataSourceMapper . queryDataSourceByName ( sqlParameters . getString ( "datasourceName" ) ) ;
if ( dataSources . size ( ) > 0 ) {
DataSource dataSource = dataSources . get ( 0 ) ;
sqlParameters . put ( "datasource" , dataSource . getId ( ) ) ;
}
taskNode . put ( "params" , sqlParameters ) ;
} else if ( taskType . equals ( TaskType . DEPENDENT . name ( ) ) ) {
JSONObject dependentParameters = JSONUtils . parseObject ( taskNode . getString ( "dependence" ) ) ;
if ( dependentParameters ! = null ) {
JSONArray dependTaskList = ( JSONArray ) dependentParameters . get ( "dependTaskList" ) ;
for ( int h = 0 ; h < dependTaskList . size ( ) ; h + + ) {
JSONObject dependentTaskModel = dependTaskList . getJSONObject ( h ) ;
JSONArray dependItemList = ( JSONArray ) dependentTaskModel . get ( "dependItemList" ) ;
for ( int k = 0 ; k < dependItemList . size ( ) ; k + + ) {
JSONObject dependentItem = dependItemList . getJSONObject ( k ) ;
Project dependentItemProject = projectMapper . queryByName ( dependentItem . getString ( "projectName" ) ) ;
if ( dependentItemProject ! = null ) {
ProcessDefinition definition = processDefineMapper . queryByDefineName ( dependentItemProject . getId ( ) , dependentItem . getString ( "definitionName" ) ) ;
if ( definition ! = null ) {
dependentItem . put ( "projectId" , dependentItemProject . getId ( ) ) ;
dependentItem . put ( "definitionId" , definition . getId ( ) ) ;
if ( resultStatus = = Status . SUCCESS ) {
//use currentProjectName to query
Project targetProject = projectMapper . queryByName ( currentProjectName ) ;
if ( null ! = targetProject ) {
processDefinitionName = recursionProcessDefinitionName ( targetProject . getId ( ) , processDefinitionName , 1 ) ;
}
JSONObject jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSONArray jsonArray = ( JSONArray ) jsonObject . get ( "tasks" ) ;
for ( int j = 0 ; j < jsonArray . size ( ) ; j + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( j ) ;
String taskType = taskNode . getString ( "type" ) ;
if ( checkTaskHasDataSource ( taskType ) ) {
JSONObject sqlParameters = JSONUtils . parseObject ( taskNode . getString ( "params" ) ) ;
List < DataSource > dataSources = dataSourceMapper . queryDataSourceByName ( sqlParameters . getString ( "datasourceName" ) ) ;
if ( ! dataSources . isEmpty ( ) ) {
DataSource dataSource = dataSources . get ( 0 ) ;
sqlParameters . put ( "datasource" , dataSource . getId ( ) ) ;
}
taskNode . put ( "params" , sqlParameters ) ;
} else if ( checkTaskHasDependent ( taskType ) ) {
JSONObject dependentParameters = JSONUtils . parseObject ( taskNode . getString ( "dependence" ) ) ;
if ( dependentParameters ! = null ) {
JSONArray dependTaskList = ( JSONArray ) dependentParameters . get ( "dependTaskList" ) ;
for ( int h = 0 ; h < dependTaskList . size ( ) ; h + + ) {
JSONObject dependentTaskModel = dependTaskList . getJSONObject ( h ) ;
JSONArray dependItemList = ( JSONArray ) dependentTaskModel . get ( "dependItemList" ) ;
for ( int k = 0 ; k < dependItemList . size ( ) ; k + + ) {
JSONObject dependentItem = dependItemList . getJSONObject ( k ) ;
Project dependentItemProject = projectMapper . queryByName ( dependentItem . getString ( "projectName" ) ) ;
if ( dependentItemProject ! = null ) {
ProcessDefinition definition = processDefineMapper . queryByDefineName ( dependentItemProject . getId ( ) , dependentItem . getString ( "definitionName" ) ) ;
if ( definition ! = null ) {
dependentItem . put ( "projectId" , dependentItemProject . getId ( ) ) ;
dependentItem . put ( "definitionId" , definition . getId ( ) ) ;
}
}
}
}
taskNode . put ( "dependence" , dependentParameters ) ;
}
taskNode . put ( "dependence" , dependentParameters ) ;
}
}
}
jsonObject . put ( "tasks" , jsonArray ) ;
Map < String , Object > createProcessDefinitionResult = createProcessDefinition ( loginUser , projectName , processDefinitionName , jsonObject . toString ( ) , processDefinitionDesc , processDefinitionLocations , processDefinitionConnects ) ;
Integer processDefinitionId = null ;
if ( ObjectUtils . allNotNull ( createProcessDefinitionResult . get ( "processDefinitionId" ) ) ) {
processDefinitionId = Integer . parseInt ( createProcessDefinitionResult . get ( "processDefinitionId" ) . toString ( ) ) ;
}
if ( ObjectUtils . allNotNull ( json . get ( "scheduleCrontab" ) ) & & processDefinitionId ! = null ) {
Date now = new Date ( ) ;
Schedule scheduleObj = new Schedule ( ) ;
scheduleObj . setProjectName ( projectName ) ;
scheduleObj . setProcessDefinitionId ( processDefinitionId ) ;
scheduleObj . setProcessDefinitionName ( processDefinitionName ) ;
scheduleObj . setCreateTime ( now ) ;
scheduleObj . setUpdateTime ( now ) ;
scheduleObj . setUserId ( loginUser . getId ( ) ) ;
scheduleObj . setUserName ( loginUser . getUserName ( ) ) ;
scheduleCrontab = json . get ( "scheduleCrontab" ) . toString ( ) ;
scheduleObj . setCrontab ( scheduleCrontab ) ;
if ( ObjectUtils . allNotNull ( json . get ( "scheduleStartTime" ) ) ) {
scheduleStartTime = json . get ( "scheduleStartTime" ) . toString ( ) ;
scheduleObj . setStartTime ( DateUtils . stringToDate ( scheduleStartTime ) ) ;
}
if ( ObjectUtils . allNotNull ( json . get ( "scheduleEndTime" ) ) ) {
scheduleEndTime = json . get ( "scheduleEndTime" ) . toString ( ) ;
scheduleObj . setEndTime ( DateUtils . stringToDate ( scheduleEndTime ) ) ;
}
if ( ObjectUtils . allNotNull ( json . get ( "scheduleWarningType" ) ) ) {
scheduleWarningType = json . get ( "scheduleWarningType" ) . toString ( ) ;
scheduleObj . setWarningType ( WarningType . valueOf ( scheduleWarningType ) ) ;
}
if ( ObjectUtils . allNotNull ( json . get ( "scheduleWarningGroupId" ) ) ) {
scheduleWarningGroupId = json . get ( "scheduleWarningGroupId" ) . toString ( ) ;
scheduleObj . setWarningGroupId ( Integer . parseInt ( scheduleWarningGroupId ) ) ;
}
if ( ObjectUtils . allNotNull ( json . get ( "scheduleFailureStrategy" ) ) ) {
scheduleFailureStrategy = json . get ( "scheduleFailureStrategy" ) . toString ( ) ;
scheduleObj . setFailureStrategy ( FailureStrategy . valueOf ( scheduleFailureStrategy ) ) ;
}
if ( ObjectUtils . allNotNull ( json . get ( "scheduleReleaseState" ) ) ) {
scheduleReleaseState = json . get ( "scheduleReleaseState" ) . toString ( ) ;
scheduleObj . setReleaseState ( ReleaseState . valueOf ( scheduleReleaseState ) ) ;
//recursive sub-process parameter correction map key for old process id value for new process id
Map < Integer , Integer > subProcessIdMap = new HashMap < > ( 20 ) ;
List < Object > subProcessList = jsonArray . stream ( )
. filter ( elem - > checkTaskHasSubProcess ( JSONUtils . parseObject ( elem . toString ( ) ) . getString ( "type" ) ) )
. collect ( Collectors . toList ( ) ) ;
if ( ! subProcessList . isEmpty ( ) ) {
importSubProcess ( loginUser , targetProject , jsonArray , subProcessIdMap ) ;
}
if ( ObjectUtils . allNotNull ( json . get ( "scheduleProcessInstancePriority" ) ) ) {
scheduleProcessInstancePriority = json . get ( "scheduleProcessInstancePriority" ) . toString ( ) ;
scheduleObj . setProcessInstancePriority ( Priority . valueOf ( scheduleProcessInstancePriority ) ) ;
jsonObject . put ( "tasks" , jsonArray ) ;
Map < String , Object > createProcessDefinitionResult = createProcessDefinition ( loginUser , currentProjectName , processDefinitionName , jsonObject . toString ( ) , processDefinitionDesc , processDefinitionLocations , processDefinitionConnects ) ;
Integer processDefinitionId = null ;
if ( Objects . nonNull ( createProcessDefinitionResult . get ( "processDefinitionId" ) ) ) {
processDefinitionId = Integer . parseInt ( createProcessDefinitionResult . get ( "processDefinitionId" ) . toString ( ) ) ;
}
if ( ObjectUtils . allNotNull ( json . get ( "scheduleWorkerGroupId" ) ) ) {
scheduleWorkerGroupId = json . get ( "scheduleWorkerGroupId" ) . toString ( ) ;
if ( scheduleWorkerGroupId ! = null ) {
scheduleObj . setWorkerGroupId ( Integer . parseInt ( scheduleWorkerGroupId ) ) ;
} else {
if ( ObjectUtils . allNotNull ( json . get ( "scheduleWorkerGroupName" ) ) ) {
scheduleWorkerGroupName = json . get ( "scheduleWorkerGroupName" ) . toString ( ) ;
List < WorkerGroup > workerGroups = workerGroupMapper . queryWorkerGroupByName ( scheduleWorkerGroupName ) ;
if ( workerGroups . size ( ) > 0 ) {
scheduleObj . setWorkerGroupId ( workerGroups . get ( 0 ) . getId ( ) ) ;
if ( Objects . nonNull ( json . get ( "scheduleCrontab" ) ) & & processDefinitionId ! = null ) {
Date now = new Date ( ) ;
Schedule scheduleObj = new Schedule ( ) ;
scheduleObj . setProjectName ( currentProjectName ) ;
scheduleObj . setProcessDefinitionId ( processDefinitionId ) ;
scheduleObj . setProcessDefinitionName ( processDefinitionName ) ;
scheduleObj . setCreateTime ( now ) ;
scheduleObj . setUpdateTime ( now ) ;
scheduleObj . setUserId ( loginUser . getId ( ) ) ;
scheduleObj . setUserName ( loginUser . getUserName ( ) ) ;
scheduleCrontab = json . get ( "scheduleCrontab" ) . toString ( ) ;
scheduleObj . setCrontab ( scheduleCrontab ) ;
if ( Objects . nonNull ( json . get ( "scheduleStartTime" ) ) ) {
scheduleStartTime = json . get ( "scheduleStartTime" ) . toString ( ) ;
scheduleObj . setStartTime ( DateUtils . stringToDate ( scheduleStartTime ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleEndTime" ) ) ) {
scheduleEndTime = json . get ( "scheduleEndTime" ) . toString ( ) ;
scheduleObj . setEndTime ( DateUtils . stringToDate ( scheduleEndTime ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleWarningType" ) ) ) {
scheduleWarningType = json . get ( "scheduleWarningType" ) . toString ( ) ;
scheduleObj . setWarningType ( WarningType . valueOf ( scheduleWarningType ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleWarningGroupId" ) ) ) {
scheduleWarningGroupId = json . get ( "scheduleWarningGroupId" ) . toString ( ) ;
scheduleObj . setWarningGroupId ( Integer . parseInt ( scheduleWarningGroupId ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleFailureStrategy" ) ) ) {
scheduleFailureStrategy = json . get ( "scheduleFailureStrategy" ) . toString ( ) ;
scheduleObj . setFailureStrategy ( FailureStrategy . valueOf ( scheduleFailureStrategy ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleReleaseState" ) ) ) {
scheduleReleaseState = json . get ( "scheduleReleaseState" ) . toString ( ) ;
scheduleObj . setReleaseState ( ReleaseState . valueOf ( scheduleReleaseState ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleProcessInstancePriority" ) ) ) {
scheduleProcessInstancePriority = json . get ( "scheduleProcessInstancePriority" ) . toString ( ) ;
scheduleObj . setProcessInstancePriority ( Priority . valueOf ( scheduleProcessInstancePriority ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleWorkerGroupId" ) ) ) {
scheduleWorkerGroupId = json . get ( "scheduleWorkerGroupId" ) . toString ( ) ;
if ( scheduleWorkerGroupId ! = null ) {
scheduleObj . setWorkerGroupId ( Integer . parseInt ( scheduleWorkerGroupId ) ) ;
} else {
if ( Objects . nonNull ( json . get ( "scheduleWorkerGroupName" ) ) ) {
scheduleWorkerGroupName = json . get ( "scheduleWorkerGroupName" ) . toString ( ) ;
List < WorkerGroup > workerGroups = workerGroupMapper . queryWorkerGroupByName ( scheduleWorkerGroupName ) ;
if ( ! workerGroups . isEmpty ( ) ) {
scheduleObj . setWorkerGroupId ( workerGroups . get ( 0 ) . getId ( ) ) ;
}
}
}
}
scheduleMapper . insert ( scheduleObj ) ;
}
scheduleMapper . insert ( scheduleObj ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
} else {
putMsg ( result , Status . EXPORT_PROCESS_DEFINE_BY_ID_ERROR ) ;
putMsg ( result , Status . IMPORT_PROCESS_DEFINE _ERROR) ;
return result ;
}
} catch ( IOException e ) {
throw new RuntimeException ( e . getMessage ( ) , e ) ;
}
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
/ * *
* check import process has sub process
* recursion create sub process
* @param loginUser login user
* @param targetProject target project
* /
public void importSubProcess ( User loginUser , Project targetProject , JSONArray jsonArray , Map < Integer , Integer > subProcessIdMap ) {
for ( int i = 0 ; i < jsonArray . size ( ) ; i + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( i ) ;
String taskType = taskNode . getString ( "type" ) ;
if ( checkTaskHasSubProcess ( taskType ) ) {
//get sub process info
JSONObject subParams = JSONUtils . parseObject ( taskNode . getString ( "params" ) ) ;
Integer subProcessId = subParams . getInteger ( "processDefinitionId" ) ;
ProcessDefinition subProcess = processDefineMapper . queryByDefineId ( subProcessId ) ;
String subProcessJson = subProcess . getProcessDefinitionJson ( ) ;
//check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefineMapper . queryByDefineName ( targetProject . getId ( ) , subProcess . getName ( ) ) ;
if ( null = = currentProjectSubProcess ) {
JSONArray subJsonArray = ( JSONArray ) JSONUtils . parseObject ( subProcess . getProcessDefinitionJson ( ) ) . get ( "tasks" ) ;
List < Object > subProcessList = subJsonArray . stream ( )
. filter ( item - > checkTaskHasSubProcess ( JSONUtils . parseObject ( item . toString ( ) ) . getString ( "type" ) ) )
. collect ( Collectors . toList ( ) ) ;
if ( ! subProcessList . isEmpty ( ) ) {
importSubProcess ( loginUser , targetProject , subJsonArray , subProcessIdMap ) ;
//sub process processId correct
if ( ! subProcessIdMap . isEmpty ( ) ) {
for ( Map . Entry < Integer , Integer > entry : subProcessIdMap . entrySet ( ) ) {
String oldSubProcessId = "\"processDefinitionId\":" + entry . getKey ( ) ;
String newSubProcessId = "\"processDefinitionId\":" + entry . getValue ( ) ;
subProcessJson = subProcessJson . replaceAll ( oldSubProcessId , newSubProcessId ) ;
}
subProcessIdMap . clear ( ) ;
}
}
//if sub-process recursion
Date now = new Date ( ) ;
//create sub process in target project
ProcessDefinition processDefine = new ProcessDefinition ( ) ;
processDefine . setName ( subProcess . getName ( ) ) ;
processDefine . setVersion ( subProcess . getVersion ( ) ) ;
processDefine . setReleaseState ( subProcess . getReleaseState ( ) ) ;
processDefine . setProjectId ( targetProject . getId ( ) ) ;
processDefine . setUserId ( loginUser . getId ( ) ) ;
processDefine . setProcessDefinitionJson ( subProcessJson ) ;
processDefine . setDescription ( subProcess . getDescription ( ) ) ;
processDefine . setLocations ( subProcess . getLocations ( ) ) ;
processDefine . setConnects ( subProcess . getConnects ( ) ) ;
processDefine . setTimeout ( subProcess . getTimeout ( ) ) ;
processDefine . setTenantId ( subProcess . getTenantId ( ) ) ;
processDefine . setGlobalParams ( subProcess . getGlobalParams ( ) ) ;
processDefine . setCreateTime ( now ) ;
processDefine . setUpdateTime ( now ) ;
processDefine . setFlag ( subProcess . getFlag ( ) ) ;
processDefine . setReceivers ( subProcess . getReceivers ( ) ) ;
processDefine . setReceiversCc ( subProcess . getReceiversCc ( ) ) ;
processDefineMapper . insert ( processDefine ) ;
logger . info ( "create sub process, project: {}, process name: {}" , targetProject . getName ( ) , processDefine . getName ( ) ) ;
//modify task node
ProcessDefinition newSubProcessDefine = processDefineMapper . queryByDefineName ( processDefine . getProjectId ( ) , processDefine . getName ( ) ) ;
if ( null ! = newSubProcessDefine ) {
subProcessIdMap . put ( subProcessId , newSubProcessDefine . getId ( ) ) ;
subParams . put ( "processDefinitionId" , newSubProcessDefine . getId ( ) ) ;
taskNode . put ( "params" , subParams ) ;
}
}
}
}
}
/ * *