@ -23,7 +23,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_P
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID ;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID ;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS ;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS ;
@ -329,18 +329,6 @@ public class ProcessService {
return null ;
return null ;
}
}
/ * *
* check thread num
*
* @param command command
* @param validThreadNum validThreadNum
* @return if thread is enough
* /
private boolean checkThreadNum ( Command command , int validThreadNum ) {
int commandThreadCount = this . workProcessThreadNumCount ( command . getProcessDefinitionCode ( ) ) ;
return validThreadNum > = commandThreadCount ;
}
/ * *
/ * *
* insert one command
* insert one command
*
*
@ -412,8 +400,8 @@ public class ProcessService {
/ * *
/ * *
* get task node list by definitionId
* get task node list by definitionId
* /
* /
public List < TaskDefinition > getTaskNodeListByDefinitionId ( Integer defineId ) {
public List < TaskDefinition > getTaskNodeListByDefinition ( long defineCode ) {
ProcessDefinition processDefinition = processDefineMapper . selectById ( defineId ) ;
ProcessDefinition processDefinition = processDefineMapper . queryByCode ( defineCode ) ;
if ( processDefinition = = null ) {
if ( processDefinition = = null ) {
logger . error ( "process define not exists" ) ;
logger . error ( "process define not exists" ) ;
return new ArrayList < > ( ) ;
return new ArrayList < > ( ) ;
@ -535,38 +523,24 @@ public class ProcessService {
}
}
}
}
/ * *
* calculate sub process number in the process define .
*
* @param processDefinitionCode processDefinitionCode
* @return process thread num count
* /
private Integer workProcessThreadNumCount ( long processDefinitionCode ) {
ProcessDefinition processDefinition = processDefineMapper . queryByCode ( processDefinitionCode ) ;
List < Integer > ids = new ArrayList < > ( ) ;
recurseFindSubProcessId ( processDefinition . getId ( ) , ids ) ;
return ids . size ( ) + 1 ;
}
/ * *
/ * *
* recursive query sub process definition id by parent id .
* recursive query sub process definition id by parent id .
*
*
* @param parentId parentId
* @param parentCode parentCode
* @param ids ids
* @param ids ids
* /
* /
public void recurseFindSubProcessId ( int parentId , List < Integer > ids ) {
public void recurseFindSubProcess ( long parentCode , List < Long > ids ) {
List < TaskDefinition > taskNodeList = this . getTaskNodeListByDefinitionId ( parentId ) ;
List < TaskDefinition > taskNodeList = this . getTaskNodeListByDefinition ( parentCode ) ;
if ( taskNodeList ! = null & & ! taskNodeList . isEmpty ( ) ) {
if ( taskNodeList ! = null & & ! taskNodeList . isEmpty ( ) ) {
for ( TaskDefinition taskNode : taskNodeList ) {
for ( TaskDefinition taskNode : taskNodeList ) {
String parameter = taskNode . getTaskParams ( ) ;
String parameter = taskNode . getTaskParams ( ) ;
ObjectNode parameterJson = JSONUtils . parseObject ( parameter ) ;
ObjectNode parameterJson = JSONUtils . parseObject ( parameter ) ;
if ( parameterJson . get ( CMD_PARAM_SUB_PROCESS_DEFINE_ID ) ! = null ) {
if ( parameterJson . get ( CMD_PARAM_SUB_PROCESS_DEFINE_CODE ) ! = null ) {
SubProcessParameters subProcessParam = JSONUtils . parseObject ( parameter , SubProcessParameters . class ) ;
SubProcessParameters subProcessParam = JSONUtils . parseObject ( parameter , SubProcessParameters . class ) ;
ids . add ( subProcessParam . getProcessDefinitionId ( ) ) ;
ids . add ( subProcessParam . getProcessDefinitionCode ( ) ) ;
recurseFindSubProcessId ( subProcessParam . getProcessDefinitionId ( ) , ids ) ;
recurseFindSubProcess ( subProcessParam . getProcessDefinitionCode ( ) , ids ) ;
}
}
}
}
}
}
@ -678,7 +652,6 @@ public class ProcessService {
processInstance . setStartTime ( new Date ( ) ) ;
processInstance . setStartTime ( new Date ( ) ) ;
processInstance . setRunTimes ( 1 ) ;
processInstance . setRunTimes ( 1 ) ;
processInstance . setMaxTryTimes ( 0 ) ;
processInstance . setMaxTryTimes ( 0 ) ;
//processInstance.setProcessDefinitionId(command.getProcessDefinitionId());
processInstance . setCommandParam ( command . getCommandParam ( ) ) ;
processInstance . setCommandParam ( command . getCommandParam ( ) ) ;
processInstance . setCommandType ( command . getCommandType ( ) ) ;
processInstance . setCommandType ( command . getCommandType ( ) ) ;
processInstance . setIsSubProcess ( Flag . NO ) ;
processInstance . setIsSubProcess ( Flag . NO ) ;
@ -1307,8 +1280,8 @@ public class ProcessService {
TaskInstance task ) {
TaskInstance task ) {
CommandType commandType = getSubCommandType ( parentProcessInstance , childInstance ) ;
CommandType commandType = getSubCommandType ( parentProcessInstance , childInstance ) ;
Map < String , String > subProcessParam = JSONUtils . toMap ( task . getTaskParams ( ) ) ;
Map < String , String > subProcessParam = JSONUtils . toMap ( task . getTaskParams ( ) ) ;
int childDefineId = Integer . parseInt ( subProcessParam . get ( Constants . CMD_PARAM_SUB_PROCESS_DEFINE_ID ) ) ;
int childDefineCode = Integer . parseInt ( subProcessParam . get ( Constants . CMD_PARAM_SUB_PROCESS_DEFINE_CODE ) ) ;
ProcessDefinition subProcessDefinition = processDefineMapper . queryByDefineId ( childDefineId ) ;
ProcessDefinition subProcessDefinition = processDefineMapper . queryByCode ( childDefineCode ) ;
Object localParams = subProcessParam . get ( Constants . LOCAL_PARAMS ) ;
Object localParams = subProcessParam . get ( Constants . LOCAL_PARAMS ) ;
List < Property > allParam = JSONUtils . toList ( JSONUtils . toJsonString ( localParams ) , Property . class ) ;
List < Property > allParam = JSONUtils . toList ( JSONUtils . toJsonString ( localParams ) , Property . class ) ;