@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE ;
import org.apache.dolphinscheduler.api.dto.DagDataSchedule ;
import org.apache.dolphinscheduler.api.dto.DagDataSchedule ;
import org.apache.dolphinscheduler.api.dto.ScheduleParam ;
import org.apache.dolphinscheduler.api.dto.treeview.Instance ;
import org.apache.dolphinscheduler.api.dto.treeview.Instance ;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto ;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.enums.Status ;
@ -33,9 +34,12 @@ import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo ;
import org.apache.dolphinscheduler.api.utils.PageInfo ;
import org.apache.dolphinscheduler.api.utils.Result ;
import org.apache.dolphinscheduler.api.utils.Result ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.FailureStrategy ;
import org.apache.dolphinscheduler.common.enums.Priority ;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum ;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum ;
import org.apache.dolphinscheduler.common.enums.ReleaseState ;
import org.apache.dolphinscheduler.common.enums.ReleaseState ;
import org.apache.dolphinscheduler.common.enums.UserType ;
import org.apache.dolphinscheduler.common.enums.UserType ;
import org.apache.dolphinscheduler.common.enums.WarningType ;
import org.apache.dolphinscheduler.common.graph.DAG ;
import org.apache.dolphinscheduler.common.graph.DAG ;
import org.apache.dolphinscheduler.common.model.TaskNode ;
import org.apache.dolphinscheduler.common.model.TaskNode ;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation ;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation ;
@ -1533,6 +1537,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return process definition code
* @return process definition code
* /
* /
@Override
@Override
@Transactional ( rollbackFor = RuntimeException . class )
public Map < String , Object > createEmptyProcessDefinition ( User loginUser ,
public Map < String , Object > createEmptyProcessDefinition ( User loginUser ,
long projectCode ,
long projectCode ,
String name ,
String name ,
@ -1542,7 +1547,98 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
String tenantCode ,
String tenantCode ,
String scheduleJson ,
String scheduleJson ,
ProcessExecutionTypeEnum executionType ) {
ProcessExecutionTypeEnum executionType ) {
return null ;
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper . verifyByDefineName ( project . getCode ( ) , name ) ;
if ( definition ! = null ) {
putMsg ( result , Status . PROCESS_DEFINITION_NAME_EXIST , name ) ;
return result ;
}
int tenantId = - 1 ;
if ( ! Constants . DEFAULT . equals ( tenantCode ) ) {
Tenant tenant = tenantMapper . queryByTenantCode ( tenantCode ) ;
if ( tenant = = null ) {
putMsg ( result , Status . TENANT_NOT_EXIST ) ;
return result ;
}
tenantId = tenant . getId ( ) ;
}
long processDefinitionCode ;
try {
processDefinitionCode = CodeGenerateUtils . getInstance ( ) . genCode ( ) ;
} catch ( CodeGenerateException e ) {
putMsg ( result , Status . INTERNAL_SERVER_ERROR_ARGS ) ;
return result ;
}
ProcessDefinition processDefinition = new ProcessDefinition ( projectCode , name , processDefinitionCode , description ,
globalParams , "" , timeout , loginUser . getId ( ) , tenantId ) ;
processDefinition . setExecutionType ( executionType ) ;
result = createEmptyDagDefine ( loginUser , processDefinition ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
if ( scheduleJson = = null | | scheduleJson . trim ( ) . isEmpty ( ) ) {
return result ;
}
// save dag schedule
Map < String , Object > scheduleResult = createDagSchedule ( loginUser , projectCode , processDefinitionCode , scheduleJson ) ;
if ( scheduleResult . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return scheduleResult ;
}
return result ;
}
private Map < String , Object > createEmptyDagDefine ( User loginUser , ProcessDefinition processDefinition ) {
Map < String , Object > result = new HashMap < > ( ) ;
int insertVersion = processService . saveProcessDefine ( loginUser , processDefinition , true ) ;
if ( insertVersion = = 0 ) {
putMsg ( result , Status . CREATE_PROCESS_DEFINITION_ERROR ) ;
throw new ServiceException ( Status . CREATE_PROCESS_DEFINITION_ERROR ) ;
}
putMsg ( result , Status . SUCCESS ) ;
result . put ( Constants . DATA_LIST , processDefinition ) ;
return result ;
}
private Map < String , Object > createDagSchedule ( User loginUser ,
long projectCode ,
long processDefinitionCode ,
String scheduleJson ) {
Schedule schedule = JSONUtils . parseObject ( scheduleJson , Schedule . class ) ;
// set default value
FailureStrategy failureStrategy = schedule . getFailureStrategy ( ) = = null ? FailureStrategy . CONTINUE : schedule . getFailureStrategy ( ) ;
WarningType warningType = schedule . getWarningType ( ) = = null ? WarningType . NONE : schedule . getWarningType ( ) ;
Priority processInstancePriority = schedule . getProcessInstancePriority ( ) = = null ? Priority . MEDIUM : schedule . getProcessInstancePriority ( ) ;
int warningGroupId = schedule . getWarningGroupId ( ) = = 0 ? 1 : schedule . getWarningGroupId ( ) ;
String workerGroup = schedule . getWorkerGroup ( ) = = null ? "default" : schedule . getWorkerGroup ( ) ;
Long environmentCode = schedule . getEnvironmentCode ( ) = = null ? - 1 : schedule . getEnvironmentCode ( ) ;
ScheduleParam param = new ScheduleParam ( ) ;
param . setStartTime ( schedule . getStartTime ( ) ) ;
param . setEndTime ( schedule . getEndTime ( ) ) ;
param . setCrontab ( schedule . getCrontab ( ) ) ;
param . setTimezoneId ( schedule . getTimezoneId ( ) ) ;
return schedulerService . insertSchedule (
loginUser ,
projectCode ,
processDefinitionCode ,
JSONUtils . toJsonString ( param ) ,
warningType ,
warningGroupId ,
failureStrategy ,
processInstancePriority ,
workerGroup ,
environmentCode ) ;
}
}
/ * *
/ * *
@ -1571,7 +1667,111 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
String tenantCode ,
String tenantCode ,
String scheduleJson ,
String scheduleJson ,
ProcessExecutionTypeEnum executionType ) {
ProcessExecutionTypeEnum executionType ) {
return null ;
Project project = projectMapper . queryByCode ( projectCode ) ;
//check user access for project
Map < String , Object > result = projectService . checkProjectAndAuth ( loginUser , project , projectCode ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
int tenantId = - 1 ;
if ( ! Constants . DEFAULT . equals ( tenantCode ) ) {
Tenant tenant = tenantMapper . queryByTenantCode ( tenantCode ) ;
if ( tenant = = null ) {
putMsg ( result , Status . TENANT_NOT_EXIST ) ;
return result ;
}
tenantId = tenant . getId ( ) ;
}
ProcessDefinition processDefinition = processDefinitionMapper . queryByCode ( code ) ;
// check process definition exists
if ( processDefinition = = null ) {
putMsg ( result , Status . PROCESS_DEFINE_NOT_EXIST , code ) ;
return result ;
}
if ( processDefinition . getReleaseState ( ) = = ReleaseState . ONLINE ) {
// online can not permit edit
putMsg ( result , Status . PROCESS_DEFINE_NOT_ALLOWED_EDIT , processDefinition . getName ( ) ) ;
return result ;
}
if ( ! name . equals ( processDefinition . getName ( ) ) ) {
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper . verifyByDefineName ( project . getCode ( ) , name ) ;
if ( definition ! = null ) {
putMsg ( result , Status . PROCESS_DEFINITION_NAME_EXIST , name ) ;
return result ;
}
}
ProcessDefinition processDefinitionDeepCopy = JSONUtils . parseObject ( JSONUtils . toJsonString ( processDefinition ) , ProcessDefinition . class ) ;
processDefinition . set ( projectCode , name , description , globalParams , "" , timeout , tenantId ) ;
processDefinition . setExecutionType ( executionType ) ;
result = updateDagDefineBasicInfo ( loginUser , processDefinition , processDefinitionDeepCopy ) ;
if ( result . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return result ;
}
if ( scheduleJson = = null | | scheduleJson . trim ( ) . isEmpty ( ) ) {
return result ;
}
// update dag schedule
Map < String , Object > scheduleResult = updateDagSchedule ( loginUser , projectCode , code , scheduleJson ) ;
if ( scheduleResult . get ( Constants . STATUS ) ! = Status . SUCCESS ) {
return scheduleResult ;
}
return result ;
}
private Map < String , Object > updateDagDefineBasicInfo ( User loginUser ,
ProcessDefinition processDefinition ,
ProcessDefinition processDefinitionDeepCopy ) {
Map < String , Object > result = new HashMap < > ( ) ;
int insertVersion ;
if ( processDefinition . equals ( processDefinitionDeepCopy ) ) {
insertVersion = processDefinitionDeepCopy . getVersion ( ) ;
} else {
processDefinition . setUpdateTime ( new Date ( ) ) ;
insertVersion = processService . saveProcessDefine ( loginUser , processDefinition , true ) ;
}
if ( insertVersion = = 0 ) {
putMsg ( result , Status . UPDATE_PROCESS_DEFINITION_ERROR ) ;
throw new ServiceException ( Status . UPDATE_PROCESS_DEFINITION_ERROR ) ;
}
putMsg ( result , Status . SUCCESS ) ;
result . put ( Constants . DATA_LIST , processDefinition ) ;
return result ;
}
private Map < String , Object > updateDagSchedule ( User loginUser ,
long projectCode ,
long processDefinitionCode ,
String scheduleJson ) {
Schedule schedule = JSONUtils . parseObject ( scheduleJson , Schedule . class ) ;
// set default value
FailureStrategy failureStrategy = schedule . getFailureStrategy ( ) = = null ? FailureStrategy . CONTINUE : schedule . getFailureStrategy ( ) ;
WarningType warningType = schedule . getWarningType ( ) = = null ? WarningType . NONE : schedule . getWarningType ( ) ;
Priority processInstancePriority = schedule . getProcessInstancePriority ( ) = = null ? Priority . MEDIUM : schedule . getProcessInstancePriority ( ) ;
int warningGroupId = schedule . getWarningGroupId ( ) = = 0 ? 1 : schedule . getWarningGroupId ( ) ;
String workerGroup = schedule . getWorkerGroup ( ) = = null ? "default" : schedule . getWorkerGroup ( ) ;
Long environmentCode = schedule . getEnvironmentCode ( ) = = null ? - 1 : schedule . getEnvironmentCode ( ) ;
ScheduleParam param = new ScheduleParam ( ) ;
param . setStartTime ( schedule . getStartTime ( ) ) ;
param . setEndTime ( schedule . getEndTime ( ) ) ;
param . setCrontab ( schedule . getCrontab ( ) ) ;
param . setTimezoneId ( schedule . getTimezoneId ( ) ) ;
return schedulerService . updateScheduleByProcessDefinitionCode (
loginUser ,
projectCode ,
processDefinitionCode ,
JSONUtils . toJsonString ( param ) ,
warningType ,
warningGroupId ,
failureStrategy ,
processInstancePriority ,
workerGroup ,
environmentCode ) ;
}
}
/ * *
/ * *