@ -22,11 +22,15 @@ import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage ;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
import com.fasterxml.jackson.core.JsonProcessingException ;
import org.apache.dolphinscheduler.api.dto.ProcessMeta ;
import org.apache.dolphinscheduler.api.dto.treeview.Instance ;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.utils.CheckUtils ;
import org.apache.dolphinscheduler.api.utils.FileUtils ;
import org.apache.dolphinscheduler.api.utils.PageInfo ;
import org.apache.dolphinscheduler.api.utils.exportprocess.ProcessAddTaskParam ;
import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.* ;
import org.apache.dolphinscheduler.common.graph.DAG ;
@ -53,9 +57,7 @@ import org.springframework.web.multipart.MultipartFile;
import javax.servlet.ServletOutputStream ;
import javax.servlet.http.HttpServletResponse ;
import java.io.BufferedOutputStream ;
import java.io.BufferedReader ;
import java.io.IOException ;
import java.io.InputStreamReader ;
import java.nio.charset.StandardCharsets ;
import java.util.* ;
import java.util.concurrent.ConcurrentHashMap ;
@ -94,9 +96,6 @@ public class ProcessDefinitionService extends BaseDAGService {
private ProcessDao processDao ;
private DataSourceMapper dataSourceMapper ;
private WorkerGroupMapper workerGroupMapper ;
@ -496,43 +495,7 @@ public class ProcessDefinitionService extends BaseDAGService {
ProcessDefinition processDefinition = processDefineMapper . queryByDefineId ( processDefinitionId ) ;
if ( null ! = processDefinition ) {
//correct task param which has data source or dependent param
String correctProcessDefinitionJson = addTaskNodeSpecialParam ( processDefinition . getProcessDefinitionJson ( ) ) ;
processDefinition . setProcessDefinitionJson ( correctProcessDefinitionJson ) ;
Map < String , Object > row = new LinkedHashMap < > ( ) ;
row . put ( "projectName" , processDefinition . getProjectName ( ) ) ;
row . put ( "processDefinitionName" , processDefinition . getName ( ) ) ;
row . put ( "processDefinitionJson" , processDefinition . getProcessDefinitionJson ( ) ) ;
row . put ( "processDefinitionDescription" , processDefinition . getDescription ( ) ) ;
row . put ( "processDefinitionLocations" , processDefinition . getLocations ( ) ) ;
row . put ( "processDefinitionConnects" , processDefinition . getConnects ( ) ) ;
//schedule info
List < Schedule > schedules = scheduleMapper . queryByProcessDefinitionId ( processDefinitionId ) ;
if ( ! schedules . isEmpty ( ) ) {
Schedule schedule = schedules . get ( 0 ) ;
row . put ( "scheduleWarningType" , schedule . getWarningType ( ) ) ;
row . put ( "scheduleWarningGroupId" , schedule . getWarningGroupId ( ) ) ;
row . put ( "scheduleStartTime" , DateUtils . dateToString ( schedule . getStartTime ( ) ) ) ;
row . put ( "scheduleEndTime" , DateUtils . dateToString ( schedule . getEndTime ( ) ) ) ;
row . put ( "scheduleCrontab" , schedule . getCrontab ( ) ) ;
row . put ( "scheduleFailureStrategy" , schedule . getFailureStrategy ( ) ) ;
row . put ( "scheduleReleaseState" , ReleaseState . OFFLINE ) ;
row . put ( "scheduleProcessInstancePriority" , schedule . getProcessInstancePriority ( ) ) ;
if ( schedule . getId ( ) = = - 1 ) {
row . put ( "scheduleWorkerGroupId" , - 1 ) ;
} else {
WorkerGroup workerGroup = workerGroupMapper . selectById ( schedule . getWorkerGroupId ( ) ) ;
if ( workerGroup ! = null ) {
row . put ( "scheduleWorkerGroupName" , workerGroup . getName ( ) ) ;
//create workflow json file
String rowsJson = JSONUtils . toJsonString ( row ) ;
String exportProcessJson = exportProcessMetaDataStr ( processDefinitionId , processDefinition ) ;
response . setContentType ( MediaType . APPLICATION_JSON_UTF8_VALUE ) ;
response . setHeader ( "Content-Disposition" , "attachment;filename=" + processDefinition . getName ( ) + ".json" ) ;
BufferedOutputStream buff = null ;
@ -540,7 +503,7 @@ public class ProcessDefinitionService extends BaseDAGService {
try {
out = response . getOutputStream ( ) ;
buff = new BufferedOutputStream ( out ) ;
buff . write ( row sJson. getBytes ( StandardCharsets . UTF_8 ) ) ;
buff . write ( exportProces sJson. getBytes ( StandardCharsets . UTF_8 ) ) ;
buff . flush ( ) ;
buff . close ( ) ;
} catch ( IOException e ) {
@ -560,18 +523,66 @@ public class ProcessDefinitionService extends BaseDAGService {
logger . warn ( "export process output stream not close" , e ) ;
/ * *
* get export process metadata string
* @param processDefinitionId process definition id
* @param processDefinition process definition
* @return export process metadata string
* /
public String exportProcessMetaDataStr ( Integer processDefinitionId , ProcessDefinition processDefinition ) {
//correct task param which has data source or dependent param
String correctProcessDefinitionJson = addExportTaskNodeSpecialParam ( processDefinition . getProcessDefinitionJson ( ) ) ;
processDefinition . setProcessDefinitionJson ( correctProcessDefinitionJson ) ;
//export process metadata
ProcessMeta exportProcessMeta = new ProcessMeta ( ) ;
exportProcessMeta . setProjectName ( processDefinition . getProjectName ( ) ) ;
exportProcessMeta . setProcessDefinitionName ( processDefinition . getName ( ) ) ;
exportProcessMeta . setProcessDefinitionJson ( processDefinition . getProcessDefinitionJson ( ) ) ;
exportProcessMeta . setProcessDefinitionLocations ( processDefinition . getLocations ( ) ) ;
exportProcessMeta . setProcessDefinitionConnects ( processDefinition . getConnects ( ) ) ;
//schedule info
List < Schedule > schedules = scheduleMapper . queryByProcessDefinitionId ( processDefinitionId ) ;
if ( ! schedules . isEmpty ( ) ) {
Schedule schedule = schedules . get ( 0 ) ;
WorkerGroup workerGroup = workerGroupMapper . selectById ( schedule . getWorkerGroupId ( ) ) ;
if ( null = = workerGroup & & schedule . getWorkerGroupId ( ) = = - 1 ) {
workerGroup = new WorkerGroup ( ) ;
workerGroup . setId ( - 1 ) ;
workerGroup . setName ( "" ) ;
exportProcessMeta . setScheduleWarningType ( schedule . getWarningType ( ) . toString ( ) ) ;
exportProcessMeta . setScheduleWarningGroupId ( schedule . getWarningGroupId ( ) ) ;
exportProcessMeta . setScheduleStartTime ( DateUtils . dateToString ( schedule . getStartTime ( ) ) ) ;
exportProcessMeta . setScheduleEndTime ( DateUtils . dateToString ( schedule . getEndTime ( ) ) ) ;
exportProcessMeta . setScheduleCrontab ( schedule . getCrontab ( ) ) ;
exportProcessMeta . setScheduleFailureStrategy ( String . valueOf ( schedule . getFailureStrategy ( ) ) ) ;
exportProcessMeta . setScheduleReleaseState ( String . valueOf ( ReleaseState . OFFLINE ) ) ;
exportProcessMeta . setScheduleProcessInstancePriority ( String . valueOf ( schedule . getProcessInstancePriority ( ) ) ) ;
if ( null ! = workerGroup ) {
exportProcessMeta . setScheduleWorkerGroupId ( workerGroup . getId ( ) ) ;
exportProcessMeta . setScheduleWorkerGroupName ( workerGroup . getName ( ) ) ;
//create workflow json file
return JSONUtils . toJsonString ( exportProcessMeta ) ;
/ * *
* correct task param which has datasource or dependent
* @param processDefinitionJson processDefinitionJson
* @return correct processDefinitionJson
* /
public String addTaskNodeSpecialParam ( String processDefinitionJson ) {
public String addExport TaskNodeSpecialParam ( String processDefinitionJson ) {
JSONObject jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSONArray jsonArray = ( JSONArray ) jsonObject . get ( "tasks" ) ;
@ -580,35 +591,9 @@ public class ProcessDefinitionService extends BaseDAGService {
if ( StringUtils . isNotEmpty ( taskNode . getString ( "type" ) ) ) {
String taskType = taskNode . getString ( "type" ) ;
if ( checkTaskHasDataSource ( taskType ) ) {
// add sqlParameters
JSONObject sqlParameters = JSONUtils . parseObject ( taskNode . getString ( "params" ) ) ;
DataSource dataSource = dataSourceMapper . selectById ( ( Integer ) sqlParameters . get ( "datasource" ) ) ;
if ( null ! = dataSource ) {
sqlParameters . put ( "datasourceName" , dataSource . getName ( ) ) ;
taskNode . put ( "params" , sqlParameters ) ;
} else if ( checkTaskHasDependent ( taskType ) ) {
// add dependent param
JSONObject dependentParameters = JSONUtils . parseObject ( taskNode . getString ( "dependence" ) ) ;
if ( null ! = dependentParameters ) {
JSONArray dependTaskList = ( JSONArray ) dependentParameters . get ( "dependTaskList" ) ;
for ( int j = 0 ; j < dependTaskList . size ( ) ; j + + ) {
JSONObject dependentTaskModel = dependTaskList . getJSONObject ( j ) ;
JSONArray dependItemList = ( JSONArray ) dependentTaskModel . get ( "dependItemList" ) ;
for ( int k = 0 ; k < dependItemList . size ( ) ; k + + ) {
JSONObject dependentItem = dependItemList . getJSONObject ( k ) ;
int definitionId = dependentItem . getInteger ( "definitionId" ) ;
ProcessDefinition definition = processDefineMapper . queryByDefineId ( definitionId ) ;
if ( null ! = definition ) {
dependentItem . put ( "projectName" , definition . getProjectName ( ) ) ;
dependentItem . put ( "definitionName" , definition . getName ( ) ) ;
taskNode . put ( "dependence" , dependentParameters ) ;
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory . getByTaskType ( taskType ) ;
if ( null ! = addTaskParam ) {
addTaskParam . addExportSpecialParam ( taskNode ) ;
@ -616,24 +601,6 @@ public class ProcessDefinitionService extends BaseDAGService {
return jsonObject . toString ( ) ;
/ * *
* check task if has dependent
* @param taskType task type
* @return if task has dependent return true else false
* /
private boolean checkTaskHasDependent ( String taskType ) {
return taskType . equals ( TaskType . DEPENDENT . name ( ) ) ;
/ * *
* check task if has data source info
* @param taskType task type
* @return if task has data source return true else false
* /
private boolean checkTaskHasDataSource ( String taskType ) {
return taskType . equals ( TaskType . SQL . name ( ) ) | | taskType . equals ( TaskType . PROCEDURE . name ( ) ) ;
/ * *
* check task if has sub process
* @param taskType task type
@ -648,211 +615,173 @@ public class ProcessDefinitionService extends BaseDAGService {
* @param loginUser login user
* @param file process metadata json file
* @param currentProjectName current project name
* @return
* @return import process
* /
@Transactional ( rollbackFor = Exception . class )
public Map < String , Object > importProcessDefinition ( User loginUser , MultipartFile file , String currentProjectName ) {
Map < String , Object > result = new HashMap < > ( 5 ) ;
String processMetaJson = FileUtils . file2String ( file ) ;
ProcessMeta processMeta = JSONUtils . parseObject ( processMetaJson , ProcessMeta . class ) ;
JSONObject json ;
//check file content
if ( null = = processMeta ) {
putMsg ( result , Status . DATA_IS_NULL , "fileContent" ) ;
return result ;
if ( StringUtils . isEmpty ( processMeta . getProjectName ( ) ) ) {
putMsg ( result , Status . DATA_IS_NULL , "projectName" ) ;
return result ;
if ( StringUtils . isEmpty ( processMeta . getProcessDefinitionName ( ) ) ) {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionName" ) ;
return result ;
if ( StringUtils . isEmpty ( processMeta . getProcessDefinitionJson ( ) ) ) {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionJson" ) ;
return result ;
//read workflow json
try ( InputStreamReader inputStreamReader = new InputStreamReader ( file . getInputStream ( ) , StandardCharsets . UTF_8 ) ) {
BufferedReader streamReader = new BufferedReader ( inputStreamReader ) ;
StringBuilder respomseStrBuilder = new StringBuilder ( ) ;
String inputStr ;
//deal with process name
String processDefinitionName = processMeta . getProcessDefinitionName ( ) ;
//use currentProjectName to query
Project targetProject = projectMapper . queryByName ( currentProjectName ) ;
if ( null ! = targetProject ) {
processDefinitionName = recursionProcessDefinitionName ( targetProject . getId ( ) ,
processDefinitionName , 1 ) ;
while ( ( inputStr = streamReader . readLine ( ) ) ! = null ) {
respomseStrBuilder . append ( inputStr ) ;
//add special task param
String importProcessParam = addImportTaskNodeParam ( loginUser , processMeta . getProcessDefinitionJson ( ) , targetProject ) ;
json = JSONObject . parseObject ( respomseStrBuilder . toString ( ) ) ;
if ( null ! = json ) {
String originProjectName = null ;
String processDefinitionName = null ;
String processDefinitionJson = null ;
String processDefinitionDesc = null ;
String processDefinitionLocations = null ;
String processDefinitionConnects = null ;
String scheduleWarningType = null ;
String scheduleWarningGroupId = null ;
String scheduleStartTime = null ;
String scheduleEndTime = null ;
String scheduleCrontab = null ;
String scheduleFailureStrategy = null ;
String scheduleReleaseState = null ;
String scheduleProcessInstancePriority = null ;
String scheduleWorkerGroupId = null ;
String scheduleWorkerGroupName = null ;
if ( Objects . nonNull ( json . get ( "projectName" ) ) ) {
originProjectName = json . get ( "projectName" ) . toString ( ) ;
} else {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionName" ) ;
return result ;
if ( Objects . nonNull ( json . get ( "processDefinitionName" ) ) ) {
processDefinitionName = json . get ( "processDefinitionName" ) . toString ( ) ;
} else {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionName" ) ;
return result ;
if ( Objects . nonNull ( json . get ( "processDefinitionJson" ) ) ) {
processDefinitionJson = json . get ( "processDefinitionJson" ) . toString ( ) ;
} else {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionJson" ) ;
return result ;
if ( Objects . nonNull ( json . get ( "processDefinitionDescription" ) ) ) {
processDefinitionDesc = json . get ( "processDefinitionDescription" ) . toString ( ) ;
if ( Objects . nonNull ( json . get ( "processDefinitionLocations" ) ) ) {
processDefinitionLocations = json . get ( "processDefinitionLocations" ) . toString ( ) ;
if ( Objects . nonNull ( json . get ( "processDefinitionConnects" ) ) ) {
processDefinitionConnects = json . get ( "processDefinitionConnects" ) . toString ( ) ;
//check user access for org project
Project originProject = projectMapper . queryByName ( originProjectName ) ;
Map < String , Object > checkResult = projectService . checkProjectAndAuth ( loginUser , originProject , originProjectName ) ;
Status resultStatus = ( Status ) checkResult . get ( Constants . STATUS ) ;
Map < String , Object > createProcessResult ;
try {
createProcessResult = createProcessDefinition ( loginUser
, currentProjectName ,
processDefinitionName ,
importProcessParam ,
processMeta . getProcessDefinitionDescription ( ) ,
processMeta . getProcessDefinitionLocations ( ) ,
processMeta . getProcessDefinitionConnects ( ) ) ;
} catch ( JsonProcessingException e ) {
logger . error ( "import process meta json data: {}" , e . getMessage ( ) , e ) ;
putMsg ( result , Status . IMPORT_PROCESS_DEFINE_ERROR ) ;
return result ;
if ( resultStatus = = Status . SUCCESS ) {
//use currentProjectName to query
Project targetProject = projectMapper . queryByName ( currentProjectName ) ;
if ( null ! = targetProject ) {
processDefinitionName = recursionProcessDefinitionName ( targetProject . getId ( ) , processDefinitionName , 1 ) ;
putMsg ( result , Status . SUCCESS ) ;
//create process definition
Integer processDefinitionId = null ;
if ( null ! = createProcessResult & & Objects . nonNull ( createProcessResult . get ( "processDefinitionId" ) ) ) {
processDefinitionId = Integer . parseInt ( createProcessResult . get ( "processDefinitionId" ) . toString ( ) ) ;
//scheduler param
if ( null ! = processMeta . getScheduleCrontab ( ) & & null ! = processDefinitionId ) {
int scheduleInsert = importProcessSchedule ( loginUser ,
currentProjectName ,
processMeta ,
processDefinitionName ,
processDefinitionId ) ;
if ( 0 = = scheduleInsert ) {
putMsg ( result , Status . IMPORT_PROCESS_DEFINE_ERROR ) ;
return result ;
JSONObject jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSONArray jsonArray = ( JSONArray ) jsonObject . get ( "tasks" ) ;
for ( int j = 0 ; j < jsonArray . size ( ) ; j + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( j ) ;
String taskType = taskNode . getString ( "type" ) ;
if ( checkTaskHasDataSource ( taskType ) ) {
JSONObject sqlParameters = JSONUtils . parseObject ( taskNode . getString ( "params" ) ) ;
List < DataSource > dataSources = dataSourceMapper . queryDataSourceByName ( sqlParameters . getString ( "datasourceName" ) ) ;
if ( ! dataSources . isEmpty ( ) ) {
DataSource dataSource = dataSources . get ( 0 ) ;
sqlParameters . put ( "datasource" , dataSource . getId ( ) ) ;
taskNode . put ( "params" , sqlParameters ) ;
} else if ( checkTaskHasDependent ( taskType ) ) {
JSONObject dependentParameters = JSONUtils . parseObject ( taskNode . getString ( "dependence" ) ) ;
if ( dependentParameters ! = null ) {
JSONArray dependTaskList = ( JSONArray ) dependentParameters . get ( "dependTaskList" ) ;
for ( int h = 0 ; h < dependTaskList . size ( ) ; h + + ) {
JSONObject dependentTaskModel = dependTaskList . getJSONObject ( h ) ;
JSONArray dependItemList = ( JSONArray ) dependentTaskModel . get ( "dependItemList" ) ;
for ( int k = 0 ; k < dependItemList . size ( ) ; k + + ) {
JSONObject dependentItem = dependItemList . getJSONObject ( k ) ;
Project dependentItemProject = projectMapper . queryByName ( dependentItem . getString ( "projectName" ) ) ;
if ( dependentItemProject ! = null ) {
ProcessDefinition definition = processDefineMapper . queryByDefineName ( dependentItemProject . getId ( ) , dependentItem . getString ( "definitionName" ) ) ;
if ( definition ! = null ) {
dependentItem . put ( "projectId" , dependentItemProject . getId ( ) ) ;
dependentItem . put ( "definitionId" , definition . getId ( ) ) ;
taskNode . put ( "dependence" , dependentParameters ) ;
return result ;
//recursive sub-process parameter correction map key for old process id value for new process id
Map < Integer , Integer > subProcessIdMap = new HashMap < > ( 20 ) ;
/ * *
* import process add special task param
* @param loginUser login user
* @param processDefinitionJson process definition json
* @param targetProject target project
* @return import process param
* /
private String addImportTaskNodeParam ( User loginUser , String processDefinitionJson , Project targetProject ) {
JSONObject jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSONArray jsonArray = ( JSONArray ) jsonObject . get ( "tasks" ) ;
//add sql and dependent param
for ( int i = 0 ; i < jsonArray . size ( ) ; i + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( i ) ;
String taskType = taskNode . getString ( "type" ) ;
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory . getByTaskType ( taskType ) ;
if ( null ! = addTaskParam ) {
addTaskParam . addImportSpecialParam ( taskNode ) ;
List < Object > subProcessList = jsonArray . stream ( )
. filter ( elem - > checkTaskHasSubProcess ( JSONUtils . parseObject ( elem . toString ( ) ) . getString ( "type" ) ) )
. collect ( Collectors . toList ( ) ) ;
//recursive sub-process parameter correction map key for old process id value for new process id
Map < Integer , Integer > subProcessIdMap = new HashMap < > ( 20 ) ;
if ( ! subProcessList . isEmpty ( ) ) {
importSubProcess ( loginUser , targetProject , jsonArray , subProcessIdMap ) ;
List < Object > subProcessList = jsonArray . stream ( )
. filter ( elem - > checkTaskHasSubProcess ( JSONUtils . parseObject ( elem . toString ( ) ) . getString ( "type" ) ) )
. collect ( Collectors . toList ( ) ) ;
jsonObject . put ( "tasks" , jsonArray ) ;
if ( CollectionUtils . isNotEmpty ( subProcessList ) ) {
importSubProcess ( loginUser , targetProject , jsonArray , subProcessIdMap ) ;
Map < String , Object > createProcessDefinitionResult = createProcessDefinition ( loginUser , currentProjectName , processDefinitionName , jsonObject . toString ( ) , processDefinitionDesc , processDefinitionLocations , processDefinitionConnects ) ;
Integer processDefinitionId = null ;
if ( Objects . nonNull ( createProcessDefinitionResult . get ( "processDefinitionId" ) ) ) {
processDefinitionId = Integer . parseInt ( createProcessDefinitionResult . get ( "processDefinitionId" ) . toString ( ) ) ;
if ( Objects . nonNull ( json . get ( "scheduleCrontab" ) ) & & processDefinitionId ! = null ) {
Date now = new Date ( ) ;
Schedule scheduleObj = new Schedule ( ) ;
scheduleObj . setProjectName ( currentProjectName ) ;
scheduleObj . setProcessDefinitionId ( processDefinitionId ) ;
scheduleObj . setProcessDefinitionName ( processDefinitionName ) ;
scheduleObj . setCreateTime ( now ) ;
scheduleObj . setUpdateTime ( now ) ;
scheduleObj . setUserId ( loginUser . getId ( ) ) ;
scheduleObj . setUserName ( loginUser . getUserName ( ) ) ;
scheduleCrontab = json . get ( "scheduleCrontab" ) . toString ( ) ;
scheduleObj . setCrontab ( scheduleCrontab ) ;
if ( Objects . nonNull ( json . get ( "scheduleStartTime" ) ) ) {
scheduleStartTime = json . get ( "scheduleStartTime" ) . toString ( ) ;
scheduleObj . setStartTime ( DateUtils . stringToDate ( scheduleStartTime ) ) ;
if ( Objects . nonNull ( json . get ( "scheduleEndTime" ) ) ) {
scheduleEndTime = json . get ( "scheduleEndTime" ) . toString ( ) ;
scheduleObj . setEndTime ( DateUtils . stringToDate ( scheduleEndTime ) ) ;
if ( Objects . nonNull ( json . get ( "scheduleWarningType" ) ) ) {
scheduleWarningType = json . get ( "scheduleWarningType" ) . toString ( ) ;
scheduleObj . setWarningType ( WarningType . valueOf ( scheduleWarningType ) ) ;
if ( Objects . nonNull ( json . get ( "scheduleWarningGroupId" ) ) ) {
scheduleWarningGroupId = json . get ( "scheduleWarningGroupId" ) . toString ( ) ;
scheduleObj . setWarningGroupId ( Integer . parseInt ( scheduleWarningGroupId ) ) ;
if ( Objects . nonNull ( json . get ( "scheduleFailureStrategy" ) ) ) {
scheduleFailureStrategy = json . get ( "scheduleFailureStrategy" ) . toString ( ) ;
scheduleObj . setFailureStrategy ( FailureStrategy . valueOf ( scheduleFailureStrategy ) ) ;
if ( Objects . nonNull ( json . get ( "scheduleReleaseState" ) ) ) {
scheduleReleaseState = json . get ( "scheduleReleaseState" ) . toString ( ) ;
scheduleObj . setReleaseState ( ReleaseState . valueOf ( scheduleReleaseState ) ) ;
if ( Objects . nonNull ( json . get ( "scheduleProcessInstancePriority" ) ) ) {
scheduleProcessInstancePriority = json . get ( "scheduleProcessInstancePriority" ) . toString ( ) ;
scheduleObj . setProcessInstancePriority ( Priority . valueOf ( scheduleProcessInstancePriority ) ) ;
if ( Objects . nonNull ( json . get ( "scheduleWorkerGroupId" ) ) ) {
scheduleWorkerGroupId = json . get ( "scheduleWorkerGroupId" ) . toString ( ) ;
if ( scheduleWorkerGroupId ! = null ) {
scheduleObj . setWorkerGroupId ( Integer . parseInt ( scheduleWorkerGroupId ) ) ;
} else {
if ( Objects . nonNull ( json . get ( "scheduleWorkerGroupName" ) ) ) {
scheduleWorkerGroupName = json . get ( "scheduleWorkerGroupName" ) . toString ( ) ;
List < WorkerGroup > workerGroups = workerGroupMapper . queryWorkerGroupByName ( scheduleWorkerGroupName ) ;
if ( ! workerGroups . isEmpty ( ) ) {
scheduleObj . setWorkerGroupId ( workerGroups . get ( 0 ) . getId ( ) ) ;
scheduleMapper . insert ( scheduleObj ) ;
jsonObject . put ( "tasks" , jsonArray ) ;
return jsonObject . toString ( ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
/ * *
* import process schedule
* @param loginUser login user
* @param currentProjectName current project name
* @param processMeta process meta data
* @param processDefinitionName process definition name
* @param processDefinitionId process definition id
* @return insert schedule flag
* /
public int importProcessSchedule ( User loginUser , String currentProjectName , ProcessMeta processMeta ,
String processDefinitionName , Integer processDefinitionId ) {
Date now = new Date ( ) ;
Schedule scheduleObj = new Schedule ( ) ;
scheduleObj . setProjectName ( currentProjectName ) ;
scheduleObj . setProcessDefinitionId ( processDefinitionId ) ;
scheduleObj . setProcessDefinitionName ( processDefinitionName ) ;
scheduleObj . setCreateTime ( now ) ;
scheduleObj . setUpdateTime ( now ) ;
scheduleObj . setUserId ( loginUser . getId ( ) ) ;
scheduleObj . setUserName ( loginUser . getUserName ( ) ) ;
scheduleObj . setCrontab ( processMeta . getScheduleCrontab ( ) ) ;
if ( null ! = processMeta . getScheduleStartTime ( ) ) {
scheduleObj . setStartTime ( DateUtils . stringToDate ( processMeta . getScheduleStartTime ( ) ) ) ;
if ( null ! = processMeta . getScheduleEndTime ( ) ) {
scheduleObj . setEndTime ( DateUtils . stringToDate ( processMeta . getScheduleEndTime ( ) ) ) ;
if ( null ! = processMeta . getScheduleWarningType ( ) ) {
scheduleObj . setWarningType ( WarningType . valueOf ( processMeta . getScheduleWarningType ( ) ) ) ;
if ( null ! = processMeta . getScheduleWarningGroupId ( ) ) {
scheduleObj . setWarningGroupId ( processMeta . getScheduleWarningGroupId ( ) ) ;
if ( null ! = processMeta . getScheduleFailureStrategy ( ) ) {
scheduleObj . setFailureStrategy ( FailureStrategy . valueOf ( processMeta . getScheduleFailureStrategy ( ) ) ) ;
if ( null ! = processMeta . getScheduleReleaseState ( ) ) {
scheduleObj . setReleaseState ( ReleaseState . valueOf ( processMeta . getScheduleReleaseState ( ) ) ) ;
if ( null ! = processMeta . getScheduleProcessInstancePriority ( ) ) {
scheduleObj . setProcessInstancePriority ( Priority . valueOf ( processMeta . getScheduleProcessInstancePriority ( ) ) ) ;
if ( null ! = processMeta . getScheduleWorkerGroupId ( ) ) {
scheduleObj . setWorkerGroupId ( processMeta . getScheduleWorkerGroupId ( ) ) ;
} else {
if ( null ! = processMeta . getScheduleWorkerGroupName ( ) ) {
List < WorkerGroup > workerGroups = workerGroupMapper . queryWorkerGroupByName ( processMeta . getScheduleWorkerGroupName ( ) ) ;
if ( CollectionUtils . isNotEmpty ( workerGroups ) ) {
scheduleObj . setWorkerGroupId ( workerGroups . get ( 0 ) . getId ( ) ) ;
} else {
putMsg ( result , Status . IMPORT_PROCESS_DEFINE_ERROR ) ;
return result ;
} catch ( IOException e ) {
throw new RuntimeException ( e . getMessage ( ) , e ) ;
return result ;
return scheduleMapper . insert ( scheduleObj ) ;
/ * *
@ -860,6 +789,8 @@ public class ProcessDefinitionService extends BaseDAGService {
* recursion create sub process
* @param loginUser login user
* @param targetProject target project
* @param jsonArray process task array
* @param subProcessIdMap correct sub process id map
* /
public void importSubProcess ( User loginUser , Project targetProject , JSONArray jsonArray , Map < Integer , Integer > subProcessIdMap ) {
for ( int i = 0 ; i < jsonArray . size ( ) ; i + + ) {
@ -871,66 +802,68 @@ public class ProcessDefinitionService extends BaseDAGService {
JSONObject subParams = JSONUtils . parseObject ( taskNode . getString ( "params" ) ) ;
Integer subProcessId = subParams . getInteger ( "processDefinitionId" ) ;
ProcessDefinition subProcess = processDefineMapper . queryByDefineId ( subProcessId ) ;
String subProcessJson = subProcess . getProcessDefinitionJson ( ) ;
//check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefineMapper . queryByDefineName ( targetProject . getId ( ) , subProcess . getName ( ) ) ;
if ( null = = currentProjectSubProcess ) {
JSONArray subJsonArray = ( JSONArray ) JSONUtils . parseObject ( subProcess . getProcessDefinitionJson ( ) ) . get ( "tasks" ) ;
List < Object > subProcessList = subJsonArray . stream ( )
. filter ( item - > checkTaskHasSubProcess ( JSONUtils . parseObject ( item . toString ( ) ) . getString ( "type" ) ) )
. collect ( Collectors . toList ( ) ) ;
if ( ! subProcessList . isEmpty ( ) ) {
importSubProcess ( loginUser , targetProject , subJsonArray , subProcessIdMap ) ;
//sub process processId correct
if ( ! subProcessIdMap . isEmpty ( ) ) {
for ( Map . Entry < Integer , Integer > entry : subProcessIdMap . entrySet ( ) ) {
String oldSubProcessId = "\"processDefinitionId\":" + entry . getKey ( ) ;
String newSubProcessId = "\"processDefinitionId\":" + entry . getValue ( ) ;
subProcessJson = subProcessJson . replaceAll ( oldSubProcessId , newSubProcessId ) ;
//check is sub process exist in db
if ( null ! = subProcess ) {
String subProcessJson = subProcess . getProcessDefinitionJson ( ) ;
//check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefineMapper . queryByDefineName ( targetProject . getId ( ) , subProcess . getName ( ) ) ;
if ( null = = currentProjectSubProcess ) {
JSONArray subJsonArray = ( JSONArray ) JSONUtils . parseObject ( subProcess . getProcessDefinitionJson ( ) ) . get ( "tasks" ) ;
List < Object > subProcessList = subJsonArray . stream ( )
. filter ( item - > checkTaskHasSubProcess ( JSONUtils . parseObject ( item . toString ( ) ) . getString ( "type" ) ) )
. collect ( Collectors . toList ( ) ) ;
if ( CollectionUtils . isNotEmpty ( subProcessList ) ) {
importSubProcess ( loginUser , targetProject , subJsonArray , subProcessIdMap ) ;
//sub process processId correct
if ( ! subProcessIdMap . isEmpty ( ) ) {
for ( Map . Entry < Integer , Integer > entry : subProcessIdMap . entrySet ( ) ) {
String oldSubProcessId = "\"processDefinitionId\":" + entry . getKey ( ) ;
String newSubProcessId = "\"processDefinitionId\":" + entry . getValue ( ) ;
subProcessJson = subProcessJson . replaceAll ( oldSubProcessId , newSubProcessId ) ;
subProcessIdMap . clear ( ) ;
subProcessIdMap . clear ( ) ;
//if sub-process recursion
Date now = new Date ( ) ;
//create sub process in target project
ProcessDefinition processDefine = new ProcessDefinition ( ) ;
processDefine . setName ( subProcess . getName ( ) ) ;
processDefine . setVersion ( subProcess . getVersion ( ) ) ;
processDefine . setReleaseState ( subProcess . getReleaseState ( ) ) ;
processDefine . setProjectId ( targetProject . getId ( ) ) ;
processDefine . setUserId ( loginUser . getId ( ) ) ;
processDefine . setProcessDefinitionJson ( subProcessJson ) ;
processDefine . setDescription ( subProcess . getDescription ( ) ) ;
processDefine . setLocations ( subProcess . getLocations ( ) ) ;
processDefine . setConnects ( subProcess . getConnects ( ) ) ;
processDefine . setTimeout ( subProcess . getTimeout ( ) ) ;
processDefine . setTenantId ( subProcess . getTenantId ( ) ) ;
processDefine . setGlobalParams ( subProcess . getGlobalParams ( ) ) ;
processDefine . setCreateTime ( now ) ;
processDefine . setUpdateTime ( now ) ;
processDefine . setFlag ( subProcess . getFlag ( ) ) ;
processDefine . setReceivers ( subProcess . getReceivers ( ) ) ;
processDefine . setReceiversCc ( subProcess . getReceiversCc ( ) ) ;
processDefineMapper . insert ( processDefine ) ;
logger . info ( "create sub process, project: {}, process name: {}" , targetProject . getName ( ) , processDefine . getName ( ) ) ;
//modify task node
ProcessDefinition newSubProcessDefine = processDefineMapper . queryByDefineName ( processDefine . getProjectId ( ) , processDefine . getName ( ) ) ;
if ( null ! = newSubProcessDefine ) {
subProcessIdMap . put ( subProcessId , newSubProcessDefine . getId ( ) ) ;
subParams . put ( "processDefinitionId" , newSubProcessDefine . getId ( ) ) ;
taskNode . put ( "params" , subParams ) ;
//if sub-process recursion
Date now = new Date ( ) ;
//create sub process in target project
ProcessDefinition processDefine = new ProcessDefinition ( ) ;
processDefine . setName ( subProcess . getName ( ) ) ;
processDefine . setVersion ( subProcess . getVersion ( ) ) ;
processDefine . setReleaseState ( subProcess . getReleaseState ( ) ) ;
processDefine . setProjectId ( targetProject . getId ( ) ) ;
processDefine . setUserId ( loginUser . getId ( ) ) ;
processDefine . setProcessDefinitionJson ( subProcessJson ) ;
processDefine . setDescription ( subProcess . getDescription ( ) ) ;
processDefine . setLocations ( subProcess . getLocations ( ) ) ;
processDefine . setConnects ( subProcess . getConnects ( ) ) ;
processDefine . setTimeout ( subProcess . getTimeout ( ) ) ;
processDefine . setTenantId ( subProcess . getTenantId ( ) ) ;
processDefine . setGlobalParams ( subProcess . getGlobalParams ( ) ) ;
processDefine . setCreateTime ( now ) ;
processDefine . setUpdateTime ( now ) ;
processDefine . setFlag ( subProcess . getFlag ( ) ) ;
processDefine . setReceivers ( subProcess . getReceivers ( ) ) ;
processDefine . setReceiversCc ( subProcess . getReceiversCc ( ) ) ;
processDefineMapper . insert ( processDefine ) ;
logger . info ( "create sub process, project: {}, process name: {}" , targetProject . getName ( ) , processDefine . getName ( ) ) ;
//modify task node
ProcessDefinition newSubProcessDefine = processDefineMapper . queryByDefineName ( processDefine . getProjectId ( ) , processDefine . getName ( ) ) ;
if ( null ! = newSubProcessDefine ) {
subProcessIdMap . put ( subProcessId , newSubProcessDefine . getId ( ) ) ;
subParams . put ( "processDefinitionId" , newSubProcessDefine . getId ( ) ) ;
taskNode . put ( "params" , subParams ) ;
@ -1316,7 +1249,7 @@ public class ProcessDefinitionService extends BaseDAGService {
private String recursionProcessDefinitionName ( Integer projectId , String processDefinitionName , int num ) {
ProcessDefinition processDefinition = processDefineMapper . queryByDefineName ( projectId , processDefinitionName ) ;
if ( processDefinition ! = null ) {
if ( num > 1 ) {
if ( num > 1 ) {
String str = processDefinitionName . substring ( 0 , processDefinitionName . length ( ) - 3 ) ;
processDefinitionName = str + "(" + num + ")" ;
} else {