@ -33,7 +33,6 @@ import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.AuthorizationType ;
import org.apache.dolphinscheduler.common.enums.AuthorizationType ;
import org.apache.dolphinscheduler.common.enums.CommandType ;
import org.apache.dolphinscheduler.common.enums.CommandType ;
import org.apache.dolphinscheduler.common.enums.ConditionType ;
import org.apache.dolphinscheduler.common.enums.Direct ;
import org.apache.dolphinscheduler.common.enums.Direct ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.enums.FailureStrategy ;
import org.apache.dolphinscheduler.common.enums.FailureStrategy ;
@ -58,8 +57,6 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils ;
import org.apache.dolphinscheduler.common.utils.DateUtils ;
import org.apache.dolphinscheduler.common.utils.JSONUtils ;
import org.apache.dolphinscheduler.common.utils.JSONUtils ;
import org.apache.dolphinscheduler.common.utils.ParameterUtils ;
import org.apache.dolphinscheduler.common.utils.ParameterUtils ;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils ;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils ;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils ;
import org.apache.dolphinscheduler.dao.entity.Command ;
import org.apache.dolphinscheduler.dao.entity.Command ;
@ -104,7 +101,6 @@ import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper ;
import org.apache.dolphinscheduler.dao.mapper.UserMapper ;
import org.apache.dolphinscheduler.dao.utils.DagHelper ;
import org.apache.dolphinscheduler.dao.utils.DagHelper ;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.service.exceptions.ServiceException ;
import org.apache.dolphinscheduler.service.log.LogClientService ;
import org.apache.dolphinscheduler.service.log.LogClientService ;
import java.util.ArrayList ;
import java.util.ArrayList ;
@ -946,12 +942,11 @@ public class ProcessService {
* set sub work process flag , extends parent work process command parameters
* set sub work process flag , extends parent work process command parameters
*
*
* @param subProcessInstance subProcessInstance
* @param subProcessInstance subProcessInstance
* @return process instance
* /
* /
public ProcessInstance setSubProcessParam ( ProcessInstance subProcessInstance ) {
public void setSubProcessParam ( ProcessInstance subProcessInstance ) {
String cmdParam = subProcessInstance . getCommandParam ( ) ;
String cmdParam = subProcessInstance . getCommandParam ( ) ;
if ( StringUtils . isEmpty ( cmdParam ) ) {
if ( StringUtils . isEmpty ( cmdParam ) ) {
return subProcessInstance ;
return ;
}
}
Map < String , String > paramMap = JSONUtils . toMap ( cmdParam ) ;
Map < String , String > paramMap = JSONUtils . toMap ( cmdParam ) ;
// write sub process id into cmd param.
// write sub process id into cmd param.
@ -977,13 +972,12 @@ public class ProcessService {
}
}
ProcessInstanceMap processInstanceMap = JSONUtils . parseObject ( cmdParam , ProcessInstanceMap . class ) ;
ProcessInstanceMap processInstanceMap = JSONUtils . parseObject ( cmdParam , ProcessInstanceMap . class ) ;
if ( processInstanceMap = = null | | processInstanceMap . getParentProcessInstanceId ( ) = = 0 ) {
if ( processInstanceMap = = null | | processInstanceMap . getParentProcessInstanceId ( ) = = 0 ) {
return subProcessInstance ;
return ;
}
}
// update sub process id to process map table
// update sub process id to process map table
processInstanceMap . setProcessInstanceId ( subProcessInstance . getId ( ) ) ;
processInstanceMap . setProcessInstanceId ( subProcessInstance . getId ( ) ) ;
this . updateWorkProcessInstanceMap ( processInstanceMap ) ;
this . updateWorkProcessInstanceMap ( processInstanceMap ) ;
return subProcessInstance ;
}
}
/ * *
/ * *
@ -2074,43 +2068,6 @@ public class ProcessService {
return result ;
return result ;
}
}
/ * *
* update task definition
* /
public int updateTaskDefinition ( User operator , Long projectCode , TaskNode taskNode , TaskDefinition taskDefinition ) {
Integer version = taskDefinitionLogMapper . queryMaxVersionForDefinition ( taskDefinition . getCode ( ) ) ;
Date now = new Date ( ) ;
taskDefinition . setProjectCode ( projectCode ) ;
taskDefinition . setUserId ( operator . getId ( ) ) ;
taskDefinition . setVersion ( version = = null | | version = = 0 ? 1 : version + 1 ) ;
taskDefinition . setUpdateTime ( now ) ;
setTaskFromTaskNode ( taskNode , taskDefinition ) ;
int update = taskDefinitionMapper . updateById ( taskDefinition ) ;
// save task definition log
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog ( taskDefinition ) ;
taskDefinitionLog . setOperator ( operator . getId ( ) ) ;
taskDefinitionLog . setOperateTime ( now ) ;
int insert = taskDefinitionLogMapper . insert ( taskDefinitionLog ) ;
return insert & update ;
}
private void setTaskFromTaskNode ( TaskNode taskNode , TaskDefinition taskDefinition ) {
taskDefinition . setName ( taskNode . getName ( ) ) ;
taskDefinition . setDescription ( taskNode . getDesc ( ) ) ;
taskDefinition . setTaskType ( taskNode . getType ( ) . toUpperCase ( ) ) ;
taskDefinition . setTaskParams ( taskNode . getTaskParams ( ) ) ;
taskDefinition . setFlag ( taskNode . isForbidden ( ) ? Flag . NO : Flag . YES ) ;
taskDefinition . setTaskPriority ( taskNode . getTaskInstancePriority ( ) ) ;
taskDefinition . setWorkerGroup ( taskNode . getWorkerGroup ( ) ) ;
taskDefinition . setFailRetryTimes ( taskNode . getMaxRetryTimes ( ) ) ;
taskDefinition . setFailRetryInterval ( taskNode . getRetryInterval ( ) ) ;
taskDefinition . setTimeoutFlag ( taskNode . getTaskTimeoutParameter ( ) . getEnable ( ) ? TimeoutFlag . OPEN : TimeoutFlag . CLOSE ) ;
taskDefinition . setTimeoutNotifyStrategy ( taskNode . getTaskTimeoutParameter ( ) . getStrategy ( ) ) ;
taskDefinition . setTimeout ( taskNode . getTaskTimeoutParameter ( ) . getInterval ( ) ) ;
taskDefinition . setDelayTime ( taskNode . getDelayTime ( ) ) ;
taskDefinition . setResourceIds ( getResourceIds ( taskDefinition ) ) ;
}
/ * *
/ * *
* get resource ids
* get resource ids
*
*
@ -2179,174 +2136,6 @@ public class ProcessService {
return result & resultLog ;
return result & resultLog ;
}
}
/ * *
* save processDefinition ( including create or update processDefinition )
* /
@Deprecated
public int saveProcessDefinition ( User operator , Project project , String name , String desc , String locations ,
ProcessData processData , ProcessDefinition processDefinition , Boolean isFromProcessDefine ) {
ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog ( operator , processDefinition . getCode ( ) ,
name , processData , project , desc , locations ) ;
Map < String , TaskDefinition > taskDefinitionMap = handleTaskDefinition ( operator , project . getCode ( ) , processData . getTasks ( ) , isFromProcessDefine ) ;
if ( Constants . DEFINITION_FAILURE = = handleTaskRelation ( operator , project . getCode ( ) , processDefinitionLog , processData . getTasks ( ) , taskDefinitionMap ) ) {
return Constants . DEFINITION_FAILURE ;
}
return processDefinitionToDB ( processDefinition , processDefinitionLog , isFromProcessDefine ) ;
}
/ * *
* save processDefinition
* /
@Deprecated
public ProcessDefinitionLog insertProcessDefinitionLog ( User operator , Long processDefinitionCode , String processDefinitionName ,
ProcessData processData , Project project , String desc , String locations ) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog ( ) ;
Integer version = processDefineLogMapper . queryMaxVersionForDefinition ( processDefinitionCode ) ;
processDefinitionLog . setUserId ( operator . getId ( ) ) ;
processDefinitionLog . setCode ( processDefinitionCode ) ;
processDefinitionLog . setVersion ( version = = null | | version = = 0 ? 1 : version + 1 ) ;
processDefinitionLog . setName ( processDefinitionName ) ;
processDefinitionLog . setReleaseState ( ReleaseState . OFFLINE ) ;
processDefinitionLog . setProjectCode ( project . getCode ( ) ) ;
processDefinitionLog . setDescription ( desc ) ;
processDefinitionLog . setLocations ( locations ) ;
processDefinitionLog . setTimeout ( processData . getTimeout ( ) ) ;
processDefinitionLog . setTenantId ( processData . getTenantId ( ) ) ;
processDefinitionLog . setOperator ( operator . getId ( ) ) ;
Date now = new Date ( ) ;
processDefinitionLog . setOperateTime ( now ) ;
processDefinitionLog . setUpdateTime ( now ) ;
processDefinitionLog . setCreateTime ( now ) ;
//custom global params
List < Property > globalParamsList = new ArrayList < > ( ) ;
if ( CollectionUtils . isNotEmpty ( processData . getGlobalParams ( ) ) ) {
Set < Property > userDefParamsSet = new HashSet < > ( processData . getGlobalParams ( ) ) ;
globalParamsList = new ArrayList < > ( userDefParamsSet ) ;
}
processDefinitionLog . setGlobalParamList ( globalParamsList ) ;
processDefinitionLog . setFlag ( Flag . YES ) ;
int insert = processDefineLogMapper . insert ( processDefinitionLog ) ;
if ( insert > 0 ) {
return processDefinitionLog ;
}
return null ;
}
/ * *
* handle task definition
* /
@Deprecated
public Map < String , TaskDefinition > handleTaskDefinition ( User operator , Long projectCode , List < TaskNode > taskNodes , Boolean isFromProcessDefine ) {
if ( taskNodes = = null ) {
return null ;
}
Map < String , TaskDefinition > taskDefinitionMap = new HashMap < > ( ) ;
for ( TaskNode taskNode : taskNodes ) {
TaskDefinition taskDefinition = taskDefinitionMapper . queryByDefinitionCode ( taskNode . getCode ( ) ) ;
if ( taskDefinition = = null ) {
try {
long code = SnowFlakeUtils . getInstance ( ) . nextId ( ) ;
taskDefinition = new TaskDefinition ( ) ;
taskDefinition . setCode ( code ) ;
} catch ( SnowFlakeException e ) {
throw new ServiceException ( "Task code get error" , e ) ;
}
saveTaskDefinition ( operator , projectCode , taskNode , taskDefinition ) ;
} else {
if ( isFromProcessDefine & & isTaskOnline ( taskDefinition . getCode ( ) ) ) {
throw new ServiceException ( String . format ( "The task %s is on line in process" , taskNode . getName ( ) ) ) ;
}
updateTaskDefinition ( operator , projectCode , taskNode , taskDefinition ) ;
}
taskDefinitionMap . put ( taskNode . getName ( ) , taskDefinition ) ;
}
return taskDefinitionMap ;
}
/ * *
* handle task relations
* /
public int handleTaskRelation ( User operator ,
Long projectCode ,
ProcessDefinition processDefinition ,
List < TaskNode > taskNodes ,
Map < String , TaskDefinition > taskDefinitionMap ) {
if ( null = = processDefinition | | null = = taskNodes | | null = = taskDefinitionMap ) {
return Constants . DEFINITION_FAILURE ;
}
List < ProcessTaskRelation > processTaskRelationList = processTaskRelationMapper . queryByProcessCode ( projectCode , processDefinition . getCode ( ) ) ;
if ( ! processTaskRelationList . isEmpty ( ) ) {
processTaskRelationMapper . deleteByCode ( projectCode , processDefinition . getCode ( ) ) ;
}
List < ProcessTaskRelation > builderRelationList = new ArrayList < > ( ) ;
Date now = new Date ( ) ;
for ( TaskNode taskNode : taskNodes ) {
List < String > depList = taskNode . getDepList ( ) ;
if ( CollectionUtils . isNotEmpty ( depList ) ) {
for ( String preTaskName : depList ) {
builderRelationList . add ( new ProcessTaskRelation (
StringUtils . EMPTY ,
processDefinition . getVersion ( ) ,
projectCode ,
processDefinition . getCode ( ) ,
taskDefinitionMap . get ( preTaskName ) . getCode ( ) ,
taskDefinitionMap . get ( preTaskName ) . getVersion ( ) ,
taskDefinitionMap . get ( taskNode . getName ( ) ) . getCode ( ) ,
taskDefinitionMap . get ( taskNode . getName ( ) ) . getVersion ( ) ,
ConditionType . NONE ,
StringUtils . EMPTY ,
now ,
now ) ) ;
}
} else {
builderRelationList . add ( new ProcessTaskRelation (
StringUtils . EMPTY ,
processDefinition . getVersion ( ) ,
projectCode ,
processDefinition . getCode ( ) ,
0L , // this isn't previous task node, set zero
0 ,
taskDefinitionMap . get ( taskNode . getName ( ) ) . getCode ( ) ,
taskDefinitionMap . get ( taskNode . getName ( ) ) . getVersion ( ) ,
ConditionType . NONE ,
StringUtils . EMPTY ,
now ,
now ) ) ;
}
}
for ( ProcessTaskRelation processTaskRelation : builderRelationList ) {
saveTaskRelation ( operator , processTaskRelation ) ;
}
return 0 ;
}
public void saveTaskRelation ( User operator , ProcessTaskRelation processTaskRelation ) {
processTaskRelationMapper . insert ( processTaskRelation ) ;
// save process task relation log
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog ( processTaskRelation ) ;
processTaskRelationLog . setOperator ( operator . getId ( ) ) ;
processTaskRelationLog . setOperateTime ( new Date ( ) ) ;
processTaskRelationLogMapper . insert ( processTaskRelationLog ) ;
}
public int saveTaskDefinition ( User operator , Long projectCode , TaskNode taskNode , TaskDefinition taskDefinition ) {
Date now = new Date ( ) ;
taskDefinition . setProjectCode ( projectCode ) ;
taskDefinition . setUserId ( operator . getId ( ) ) ;
taskDefinition . setVersion ( 1 ) ;
taskDefinition . setUpdateTime ( now ) ;
taskDefinition . setCreateTime ( now ) ;
setTaskFromTaskNode ( taskNode , taskDefinition ) ;
// save the new task definition
int insert = taskDefinitionMapper . insert ( taskDefinition ) ;
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog ( taskDefinition ) ;
taskDefinitionLog . setOperator ( operator . getId ( ) ) ;
taskDefinitionLog . setOperateTime ( now ) ;
int logInsert = taskDefinitionLogMapper . insert ( taskDefinitionLog ) ;
return insert & logInsert ;
}
public boolean isTaskOnline ( Long taskCode ) {
public boolean isTaskOnline ( Long taskCode ) {
List < ProcessTaskRelation > processTaskRelationList = processTaskRelationMapper . queryByTaskCode ( taskCode ) ;
List < ProcessTaskRelation > processTaskRelationList = processTaskRelationMapper . queryByTaskCode ( taskCode ) ;
if ( ! processTaskRelationList . isEmpty ( ) ) {
if ( ! processTaskRelationList . isEmpty ( ) ) {
@ -2400,22 +2189,6 @@ public class ProcessService {
return taskDefinitionLogMapper . queryByTaskDefinitions ( taskDefinitionSet ) ;
return taskDefinitionLogMapper . queryByTaskDefinitions ( taskDefinitionSet ) ;
}
}
/ * *
* generate ProcessData
* it will be replaced by genDagData method
* /
@Deprecated
public ProcessData genProcessData ( ProcessDefinition processDefinition ) {
Map < String , String > locationMap = locationToMap ( processDefinition . getLocations ( ) ) ;
List < TaskNode > taskNodes = genTaskNodeList ( processDefinition . getCode ( ) , processDefinition . getVersion ( ) , locationMap ) ;
ProcessData processData = new ProcessData ( ) ;
processData . setTasks ( taskNodes ) ;
processData . setGlobalParams ( JSONUtils . toList ( processDefinition . getGlobalParams ( ) , Property . class ) ) ;
processData . setTenantId ( processDefinition . getTenantId ( ) ) ;
processData . setTimeout ( processDefinition . getTimeout ( ) ) ;
return processData ;
}
@Deprecated
@Deprecated
public List < TaskNode > genTaskNodeList ( Long processCode , int processVersion , Map < String , String > locationMap ) {
public List < TaskNode > genTaskNodeList ( Long processCode , int processVersion , Map < String , String > locationMap ) {
List < ProcessTaskRelationLog > processTaskRelations = processTaskRelationLogMapper . queryByProcessCodeAndVersion ( processCode , processVersion ) ;
List < ProcessTaskRelationLog > processTaskRelations = processTaskRelationLogMapper . queryByProcessCodeAndVersion ( processCode , processVersion ) ;
@ -2500,26 +2273,6 @@ public class ProcessService {
return taskDefinitionLogMapper . queryByTaskDefinitions ( taskDefinitionSet ) ;
return taskDefinitionLogMapper . queryByTaskDefinitions ( taskDefinitionSet ) ;
}
}
/ * *
* parse locations
*
* @param locations processDefinition locations
* @return key : taskName , value : taskId
* /
public Map < String , String > locationToMap ( String locations ) {
Map < String , String > frontTaskIdAndNameMap = new HashMap < > ( ) ;
if ( StringUtils . isBlank ( locations ) ) {
return frontTaskIdAndNameMap ;
}
ObjectNode jsonNodes = JSONUtils . parseObject ( locations ) ;
Iterator < Entry < String , JsonNode > > fields = jsonNodes . fields ( ) ;
while ( fields . hasNext ( ) ) {
Entry < String , JsonNode > jsonNodeEntry = fields . next ( ) ;
frontTaskIdAndNameMap . put ( JSONUtils . findValue ( jsonNodeEntry . getValue ( ) , "name" ) , jsonNodeEntry . getKey ( ) ) ;
}
return frontTaskIdAndNameMap ;
}
/ * *
/ * *
* add authorized resources
* add authorized resources
*
*