@ -27,9 +27,10 @@ import org.apache.dolphinscheduler.api.dto.treeview.Instance;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.utils.CheckUtils ;
import org.apache.dolphinscheduler.api.utils.FileUtils ;
import org.apache.dolphinscheduler.api.utils.PageInfo ;
import org.apache.dolphinscheduler.api.utils.exportprocess.ProcessAddTaskParam ;
import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory ;
import org.apache.dolphinscheduler.api.utils.exportprocess.exportProcessAddTaskParam ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.* ;
import org.apache.dolphinscheduler.common.graph.DAG ;
@ -56,9 +57,7 @@ import org.springframework.web.multipart.MultipartFile;
import javax.servlet.ServletOutputStream ;
import javax.servlet.http.HttpServletResponse ;
import java.io.BufferedOutputStream ;
import java.io.BufferedReader ;
import java.io.IOException ;
import java.io.InputStreamReader ;
import java.nio.charset.StandardCharsets ;
import java.util.* ;
import java.util.concurrent.ConcurrentHashMap ;
@ -97,9 +96,6 @@ public class ProcessDefinitionService extends BaseDAGService {
@Autowired
private ProcessDao processDao ;
@Autowired
private DataSourceMapper dataSourceMapper ;
@Autowired
private WorkerGroupMapper workerGroupMapper ;
@ -540,7 +536,7 @@ public class ProcessDefinitionService extends BaseDAGService {
* /
public String exportProcessMetaDataStr ( Integer processDefinitionId , ProcessDefinition processDefinition ) {
//correct task param which has data source or dependent param
String correctProcessDefinitionJson = addTaskNodeSpecialParam ( processDefinition . getProcessDefinitionJson ( ) ) ;
String correctProcessDefinitionJson = addExport TaskNodeSpecialParam ( processDefinition . getProcessDefinitionJson ( ) ) ;
processDefinition . setProcessDefinitionJson ( correctProcessDefinitionJson ) ;
//export process metadata
@ -586,7 +582,7 @@ public class ProcessDefinitionService extends BaseDAGService {
* @param processDefinitionJson processDefinitionJson
* @return correct processDefinitionJson
* /
public String addTaskNodeSpecialParam ( String processDefinitionJson ) {
public String addExport TaskNodeSpecialParam ( String processDefinitionJson ) {
JSONObject jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSONArray jsonArray = ( JSONArray ) jsonObject . get ( "tasks" ) ;
@ -595,9 +591,9 @@ public class ProcessDefinitionService extends BaseDAGService {
if ( StringUtils . isNotEmpty ( taskNode . getString ( "type" ) ) ) {
String taskType = taskNode . getString ( "type" ) ;
export ProcessAddTaskParam addTaskParam = TaskNodeParamFactory . getByTaskType ( taskType ) ;
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory . getByTaskType ( taskType ) ;
if ( null ! = addTaskParam ) {
addTaskParam . addSpecialParam ( taskNode ) ;
addTaskParam . addExport SpecialParam ( taskNode ) ;
}
}
}
@ -605,24 +601,6 @@ public class ProcessDefinitionService extends BaseDAGService {
return jsonObject . toString ( ) ;
}
/ * *
* check task if has dependent
* @param taskType task type
* @return if task has dependent return true else false
* /
private boolean checkTaskHasDependent ( String taskType ) {
return taskType . equals ( TaskType . DEPENDENT . name ( ) ) ;
}
/ * *
* check task if has data source info
* @param taskType task type
* @return if task has data source return true else false
* /
private boolean checkTaskHasDataSource ( String taskType ) {
return taskType . equals ( TaskType . SQL . name ( ) ) | | taskType . equals ( TaskType . PROCEDURE . name ( ) ) ;
}
/ * *
* check task if has sub process
* @param taskType task type
@ -642,206 +620,168 @@ public class ProcessDefinitionService extends BaseDAGService {
@Transactional ( rollbackFor = Exception . class )
public Map < String , Object > importProcessDefinition ( User loginUser , MultipartFile file , String currentProjectName ) {
Map < String , Object > result = new HashMap < > ( 5 ) ;
String processMetaJson = FileUtils . file2String ( file ) ;
ProcessMeta processMeta = JSONUtils . parseObject ( processMetaJson , ProcessMeta . class ) ;
JSONObject json ;
//check file content
if ( null = = processMeta ) {
putMsg ( result , Status . DATA_IS_NULL , "fileContent" ) ;
return result ;
}
if ( StringUtils . isEmpty ( processMeta . getProjectName ( ) ) ) {
putMsg ( result , Status . DATA_IS_NULL , "projectName" ) ;
return result ;
}
if ( StringUtils . isEmpty ( processMeta . getProcessDefinitionName ( ) ) ) {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionName" ) ;
return result ;
}
if ( StringUtils . isEmpty ( processMeta . getProcessDefinitionJson ( ) ) ) {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionJson" ) ;
return result ;
}
//read workflow json
try ( InputStreamReader inputStreamReader = new InputStreamReader ( file . getInputStream ( ) , StandardCharsets . UTF_8 ) ) {
BufferedReader streamReader = new BufferedReader ( inputStreamReader ) ;
StringBuilder respomseStrBuilder = new StringBuilder ( ) ;
String inputStr ;
//deal with process name
String processDefinitionName = processMeta . getProcessDefinitionName ( ) ;
//use currentProjectName to query
Project targetProject = projectMapper . queryByName ( currentProjectName ) ;
if ( null ! = targetProject ) {
processDefinitionName = recursionProcessDefinitionName ( targetProject . getId ( ) ,
processDefinitionName , 1 ) ;
}
while ( ( inputStr = streamReader . readLine ( ) ) ! = null ) {
respomseStrBuilder . append ( inputStr ) ;
}
//add special task param
String importProcessParam = addImportTaskNodeParam ( loginUser , processMeta . getProcessDefinitionJson ( ) , targetProject ) ;
json = JSONObject . parseObject ( respomseStrBuilder . toString ( ) ) ;
if ( null ! = json ) {
String originProjectName = null ;
String processDefinitionName = null ;
String processDefinitionJson = null ;
String processDefinitionDesc = null ;
String processDefinitionLocations = null ;
String processDefinitionConnects = null ;
String scheduleWarningType = null ;
String scheduleWarningGroupId = null ;
String scheduleStartTime = null ;
String scheduleEndTime = null ;
String scheduleCrontab = null ;
String scheduleFailureStrategy = null ;
String scheduleReleaseState = null ;
String scheduleProcessInstancePriority = null ;
String scheduleWorkerGroupId = null ;
String scheduleWorkerGroupName = null ;
if ( Objects . nonNull ( json . get ( "projectName" ) ) ) {
originProjectName = json . get ( "projectName" ) . toString ( ) ;
} else {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionName" ) ;
return result ;
}
if ( Objects . nonNull ( json . get ( "processDefinitionName" ) ) ) {
processDefinitionName = json . get ( "processDefinitionName" ) . toString ( ) ;
} else {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionName" ) ;
return result ;
}
if ( Objects . nonNull ( json . get ( "processDefinitionJson" ) ) ) {
processDefinitionJson = json . get ( "processDefinitionJson" ) . toString ( ) ;
} else {
putMsg ( result , Status . DATA_IS_NULL , "processDefinitionJson" ) ;
return result ;
}
if ( Objects . nonNull ( json . get ( "processDefinitionDescription" ) ) ) {
processDefinitionDesc = json . get ( "processDefinitionDescription" ) . toString ( ) ;
}
if ( Objects . nonNull ( json . get ( "processDefinitionLocations" ) ) ) {
processDefinitionLocations = json . get ( "processDefinitionLocations" ) . toString ( ) ;
}
if ( Objects . nonNull ( json . get ( "processDefinitionConnects" ) ) ) {
processDefinitionConnects = json . get ( "processDefinitionConnects" ) . toString ( ) ;
}
//check user access for org project
Project originProject = projectMapper . queryByName ( originProjectName ) ;
Map < String , Object > checkResult = projectService . checkProjectAndAuth ( loginUser , originProject , originProjectName ) ;
Status resultStatus = ( Status ) checkResult . get ( Constants . STATUS ) ;
Map < String , Object > createProcessResult ;
try {
createProcessResult = createProcessDefinition ( loginUser
, currentProjectName ,
processDefinitionName ,
importProcessParam ,
processMeta . getProcessDefinitionDescription ( ) ,
processMeta . getProcessDefinitionLocations ( ) ,
processMeta . getProcessDefinitionConnects ( ) ) ;
} catch ( JsonProcessingException e ) {
logger . error ( "import process meta json data: {}" , e . getMessage ( ) , e ) ;
putMsg ( result , Status . IMPORT_PROCESS_DEFINE_ERROR ) ;
return result ;
}
if ( resultStatus = = Status . SUCCESS ) {
//use currentProjectName to query
Project targetProject = projectMapper . queryByName ( currentProjectName ) ;
if ( null ! = targetProject ) {
processDefinitionName = recursionProcessDefinitionName ( targetProject . getId ( ) , processDefinitionName , 1 ) ;
}
putMsg ( result , Status . SUCCESS ) ;
//create process definition
Integer processDefinitionId = null ;
if ( null ! = createProcessResult & & Objects . nonNull ( createProcessResult . get ( "processDefinitionId" ) ) ) {
processDefinitionId = Integer . parseInt ( createProcessResult . get ( "processDefinitionId" ) . toString ( ) ) ;
}
//scheduler param
if ( null ! = processMeta . getScheduleCrontab ( ) & & null ! = processDefinitionId ) {
int scheduleInsert = importProcessSchedule ( loginUser ,
currentProjectName ,
processMeta ,
processDefinitionName ,
processDefinitionId ) ;
if ( 0 = = scheduleInsert ) {
putMsg ( result , Status . IMPORT_PROCESS_DEFINE_ERROR ) ;
return result ;
}
}
JSONObject jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSONArray jsonArray = ( JSONArray ) jsonObject . get ( "tasks" ) ;
for ( int j = 0 ; j < jsonArray . size ( ) ; j + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( j ) ;
String taskType = taskNode . getString ( "type" ) ;
if ( checkTaskHasDataSource ( taskType ) ) {
JSONObject sqlParameters = JSONUtils . parseObject ( taskNode . getString ( "params" ) ) ;
List < DataSource > dataSources = dataSourceMapper . queryDataSourceByName ( sqlParameters . getString ( "datasourceName" ) ) ;
if ( ! dataSources . isEmpty ( ) ) {
DataSource dataSource = dataSources . get ( 0 ) ;
sqlParameters . put ( "datasource" , dataSource . getId ( ) ) ;
}
taskNode . put ( "params" , sqlParameters ) ;
} else if ( checkTaskHasDependent ( taskType ) ) {
JSONObject dependentParameters = JSONUtils . parseObject ( taskNode . getString ( "dependence" ) ) ;
if ( dependentParameters ! = null ) {
JSONArray dependTaskList = ( JSONArray ) dependentParameters . get ( "dependTaskList" ) ;
for ( int h = 0 ; h < dependTaskList . size ( ) ; h + + ) {
JSONObject dependentTaskModel = dependTaskList . getJSONObject ( h ) ;
JSONArray dependItemList = ( JSONArray ) dependentTaskModel . get ( "dependItemList" ) ;
for ( int k = 0 ; k < dependItemList . size ( ) ; k + + ) {
JSONObject dependentItem = dependItemList . getJSONObject ( k ) ;
Project dependentItemProject = projectMapper . queryByName ( dependentItem . getString ( "projectName" ) ) ;
if ( dependentItemProject ! = null ) {
ProcessDefinition definition = processDefineMapper . queryByDefineName ( dependentItemProject . getId ( ) , dependentItem . getString ( "definitionName" ) ) ;
if ( definition ! = null ) {
dependentItem . put ( "projectId" , dependentItemProject . getId ( ) ) ;
dependentItem . put ( "definitionId" , definition . getId ( ) ) ;
}
}
}
}
taskNode . put ( "dependence" , dependentParameters ) ;
}
}
}
return result ;
}
//recursive sub-process parameter correction map key for old process id value for new process id
Map < Integer , Integer > subProcessIdMap = new HashMap < > ( 20 ) ;
/ * *
* import process add special task param
* @param loginUser login user
* @param processDefinitionJson process definition json
* @param targetProject target project
* @return import process param
* /
private String addImportTaskNodeParam ( User loginUser , String processDefinitionJson , Project targetProject ) {
JSONObject jsonObject = JSONUtils . parseObject ( processDefinitionJson ) ;
JSONArray jsonArray = ( JSONArray ) jsonObject . get ( "tasks" ) ;
//add sql and dependent param
for ( int i = 0 ; i < jsonArray . size ( ) ; i + + ) {
JSONObject taskNode = jsonArray . getJSONObject ( i ) ;
String taskType = taskNode . getString ( "type" ) ;
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory . getByTaskType ( taskType ) ;
if ( null ! = addTaskParam ) {
addTaskParam . addImportSpecialParam ( taskNode ) ;
}
}
List < Object > subProcessList = jsonArray . stream ( )
. filter ( elem - > checkTaskHasSubProcess ( JSONUtils . parseObject ( elem . toString ( ) ) . getString ( "type" ) ) )
. collect ( Collectors . toList ( ) ) ;
//recursive sub-process parameter correction map key for old process id value for new process id
Map < Integer , Integer > subProcessIdMap = new HashMap < > ( 20 ) ;
if ( ! subProcessList . isEmpty ( ) ) {
importSubProcess ( loginUser , targetProject , jsonArray , subProcessIdMap ) ;
}
List < Object > subProcessList = jsonArray . stream ( )
. filter ( elem - > checkTaskHasSubProcess ( JSONUtils . parseObject ( elem . toString ( ) ) . getString ( "type" ) ) )
. collect ( Collectors . toList ( ) ) ;
jsonObject . put ( "tasks" , jsonArray ) ;
if ( CollectionUtils . isNotEmpty ( subProcessList ) ) {
importSubProcess ( loginUser , targetProject , jsonArray , subProcessIdMap ) ;
}
Map < String , Object > createProcessDefinitionResult = createProcessDefinition ( loginUser , currentProjectName , processDefinitionName , jsonObject . toString ( ) , processDefinitionDesc , processDefinitionLocations , processDefinitionConnects ) ;
Integer processDefinitionId = null ;
if ( Objects . nonNull ( createProcessDefinitionResult . get ( "processDefinitionId" ) ) ) {
processDefinitionId = Integer . parseInt ( createProcessDefinitionResult . get ( "processDefinitionId" ) . toString ( ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleCrontab" ) ) & & processDefinitionId ! = null ) {
Date now = new Date ( ) ;
Schedule scheduleObj = new Schedule ( ) ;
scheduleObj . setProjectName ( currentProjectName ) ;
scheduleObj . setProcessDefinitionId ( processDefinitionId ) ;
scheduleObj . setProcessDefinitionName ( processDefinitionName ) ;
scheduleObj . setCreateTime ( now ) ;
scheduleObj . setUpdateTime ( now ) ;
scheduleObj . setUserId ( loginUser . getId ( ) ) ;
scheduleObj . setUserName ( loginUser . getUserName ( ) ) ;
scheduleCrontab = json . get ( "scheduleCrontab" ) . toString ( ) ;
scheduleObj . setCrontab ( scheduleCrontab ) ;
if ( Objects . nonNull ( json . get ( "scheduleStartTime" ) ) ) {
scheduleStartTime = json . get ( "scheduleStartTime" ) . toString ( ) ;
scheduleObj . setStartTime ( DateUtils . stringToDate ( scheduleStartTime ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleEndTime" ) ) ) {
scheduleEndTime = json . get ( "scheduleEndTime" ) . toString ( ) ;
scheduleObj . setEndTime ( DateUtils . stringToDate ( scheduleEndTime ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleWarningType" ) ) ) {
scheduleWarningType = json . get ( "scheduleWarningType" ) . toString ( ) ;
scheduleObj . setWarningType ( WarningType . valueOf ( scheduleWarningType ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleWarningGroupId" ) ) ) {
scheduleWarningGroupId = json . get ( "scheduleWarningGroupId" ) . toString ( ) ;
scheduleObj . setWarningGroupId ( Integer . parseInt ( scheduleWarningGroupId ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleFailureStrategy" ) ) ) {
scheduleFailureStrategy = json . get ( "scheduleFailureStrategy" ) . toString ( ) ;
scheduleObj . setFailureStrategy ( FailureStrategy . valueOf ( scheduleFailureStrategy ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleReleaseState" ) ) ) {
scheduleReleaseState = json . get ( "scheduleReleaseState" ) . toString ( ) ;
scheduleObj . setReleaseState ( ReleaseState . valueOf ( scheduleReleaseState ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleProcessInstancePriority" ) ) ) {
scheduleProcessInstancePriority = json . get ( "scheduleProcessInstancePriority" ) . toString ( ) ;
scheduleObj . setProcessInstancePriority ( Priority . valueOf ( scheduleProcessInstancePriority ) ) ;
}
if ( Objects . nonNull ( json . get ( "scheduleWorkerGroupId" ) ) ) {
scheduleWorkerGroupId = json . get ( "scheduleWorkerGroupId" ) . toString ( ) ;
if ( scheduleWorkerGroupId ! = null ) {
scheduleObj . setWorkerGroupId ( Integer . parseInt ( scheduleWorkerGroupId ) ) ;
} else {
if ( Objects . nonNull ( json . get ( "scheduleWorkerGroupName" ) ) ) {
scheduleWorkerGroupName = json . get ( "scheduleWorkerGroupName" ) . toString ( ) ;
List < WorkerGroup > workerGroups = workerGroupMapper . queryWorkerGroupByName ( scheduleWorkerGroupName ) ;
if ( ! workerGroups . isEmpty ( ) ) {
scheduleObj . setWorkerGroupId ( workerGroups . get ( 0 ) . getId ( ) ) ;
}
}
}
}
scheduleMapper . insert ( scheduleObj ) ;
}
jsonObject . put ( "tasks" , jsonArray ) ;
return jsonObject . toString ( ) ;
}
putMsg ( result , Status . SUCCESS ) ;
return result ;
/ * *
* import process schedule
* @param loginUser login user
* @param currentProjectName current project name
* @param processMeta process meta data
* @param processDefinitionName process definition name
* @param processDefinitionId process definition id
* @return insert schedule flag
* /
public int importProcessSchedule ( User loginUser , String currentProjectName , ProcessMeta processMeta ,
String processDefinitionName , Integer processDefinitionId ) {
Date now = new Date ( ) ;
Schedule scheduleObj = new Schedule ( ) ;
scheduleObj . setProjectName ( currentProjectName ) ;
scheduleObj . setProcessDefinitionId ( processDefinitionId ) ;
scheduleObj . setProcessDefinitionName ( processDefinitionName ) ;
scheduleObj . setCreateTime ( now ) ;
scheduleObj . setUpdateTime ( now ) ;
scheduleObj . setUserId ( loginUser . getId ( ) ) ;
scheduleObj . setUserName ( loginUser . getUserName ( ) ) ;
scheduleObj . setCrontab ( processMeta . getScheduleCrontab ( ) ) ;
if ( null ! = processMeta . getScheduleStartTime ( ) ) {
scheduleObj . setStartTime ( DateUtils . stringToDate ( processMeta . getScheduleStartTime ( ) ) ) ;
}
if ( null ! = processMeta . getScheduleEndTime ( ) ) {
scheduleObj . setEndTime ( DateUtils . stringToDate ( processMeta . getScheduleEndTime ( ) ) ) ;
}
if ( null ! = processMeta . getScheduleWarningType ( ) ) {
scheduleObj . setWarningType ( WarningType . valueOf ( processMeta . getScheduleWarningType ( ) ) ) ;
}
if ( null ! = processMeta . getScheduleWarningGroupId ( ) ) {
scheduleObj . setWarningGroupId ( processMeta . getScheduleWarningGroupId ( ) ) ;
}
if ( null ! = processMeta . getScheduleFailureStrategy ( ) ) {
scheduleObj . setFailureStrategy ( FailureStrategy . valueOf ( processMeta . getScheduleFailureStrategy ( ) ) ) ;
}
if ( null ! = processMeta . getScheduleReleaseState ( ) ) {
scheduleObj . setReleaseState ( ReleaseState . valueOf ( processMeta . getScheduleReleaseState ( ) ) ) ;
}
if ( null ! = processMeta . getScheduleProcessInstancePriority ( ) ) {
scheduleObj . setProcessInstancePriority ( Priority . valueOf ( processMeta . getScheduleProcessInstancePriority ( ) ) ) ;
}
if ( null ! = processMeta . getScheduleWorkerGroupId ( ) ) {
scheduleObj . setWorkerGroupId ( processMeta . getScheduleWorkerGroupId ( ) ) ;
} else {
if ( null ! = processMeta . getScheduleWorkerGroupName ( ) ) {
List < WorkerGroup > workerGroups = workerGroupMapper . queryWorkerGroupByName ( processMeta . getScheduleWorkerGroupName ( ) ) ;
if ( CollectionUtils . isNotEmpty ( workerGroups ) ) {
scheduleObj . setWorkerGroupId ( workerGroups . get ( 0 ) . getId ( ) ) ;
}
} else {
putMsg ( result , Status . IMPORT_PROCESS_DEFINE_ERROR ) ;
return result ;
}
} catch ( IOException e ) {
throw new RuntimeException ( e . getMessage ( ) , e ) ;
}
return result ;
return scheduleMapper . insert ( scheduleObj ) ;
}
/ * *
@ -873,7 +813,7 @@ public class ProcessDefinitionService extends BaseDAGService {
. filter ( item - > checkTaskHasSubProcess ( JSONUtils . parseObject ( item . toString ( ) ) . getString ( "type" ) ) )
. collect ( Collectors . toList ( ) ) ;
if ( ! subProcessList . isEmpty ( ) ) {
if ( CollectionUtils . isNotEmpty ( subProcessList ) ) {
importSubProcess ( loginUser , targetProject , subJsonArray , subProcessIdMap ) ;
//sub process processId correct
if ( ! subProcessIdMap . isEmpty ( ) ) {
@ -1307,7 +1247,7 @@ public class ProcessDefinitionService extends BaseDAGService {
private String recursionProcessDefinitionName ( Integer projectId , String processDefinitionName , int num ) {
ProcessDefinition processDefinition = processDefineMapper . queryByDefineName ( projectId , processDefinitionName ) ;
if ( processDefinition ! = null ) {
if ( num > 1 ) {
if ( num > 1 ) {
String str = processDefinitionName . substring ( 0 , processDefinitionName . length ( ) - 3 ) ;
processDefinitionName = str + "(" + num + ")" ;
} else {